• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4873

04 Dec 2025 01:55AM UTC coverage: 64.558% (-0.1%) from 64.678%
#4873

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

719 of 2219 new or added lines in 36 files covered. (32.4%)

6363 existing lines in 135 files now uncovered.

159381 of 246882 relevant lines covered (64.56%)

108937395.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.46
/source/client/src/clientRawBlockWrite.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "parser.h"
19
#include "tcol.h"
20
#include "tcompression.h"
21
#include "tdatablock.h"
22
#include "tdef.h"
23
#include "tglobal.h"
24
#include "tmsgtype.h"
25

26
#define RAW_NULL_CHECK(c) \
27
  do {                    \
28
    if (c == NULL) {      \
29
      code = terrno;      \
30
      goto end;           \
31
    }                     \
32
  } while (0)
33

34
#define RAW_FALSE_CHECK(c)           \
35
  do {                               \
36
    if (!c) {                        \
37
      code = TSDB_CODE_INVALID_PARA; \
38
      goto end;                      \
39
    }                                \
40
  } while (0)
41

42
#define RAW_RETURN_CHECK(c) \
43
  do {                      \
44
    code = c;               \
45
    if (code != 0) {        \
46
      goto end;             \
47
    }                       \
48
  } while (0)
49

50
#define LOG_ID_TAG   "connId:0x%" PRIx64 ", QID:0x%" PRIx64
51
#define LOG_ID_VALUE *(int64_t*)taos, pRequest->requestId
52

53
#define TMQ_META_VERSION "1.0"
54

55
static bool tmqAddJsonObjectItem(cJSON* object, const char* string, cJSON* item) {
333,864✔
56
  bool ret = cJSON_AddItemToObject(object, string, item);
333,864✔
57
  if (!ret) {
333,864✔
58
    cJSON_Delete(item);
×
59
  }
60
  return ret;
333,864✔
61
}
62
static bool tmqAddJsonArrayItem(cJSON* array, cJSON* item) {
59,681✔
63
  bool ret = cJSON_AddItemToArray(array, item);
59,681✔
64
  if (!ret) {
59,681✔
65
    cJSON_Delete(item);
×
66
  }
67
  return ret;
59,681✔
68
}
69

70
static int32_t  tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen);
71
static tb_uid_t processSuid(tb_uid_t suid, char* db) {
5,748✔
72
  if (db == NULL) {
5,748✔
73
    return suid;
×
74
  }
75
  return suid + MurmurHash3_32(db, strlen(db));
5,748✔
76
}
77
static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, SExtSchema* pExtSchemas, char* name, int64_t id, int8_t t,
4,801✔
78
                                 SColCmprWrapper* pColCmprRow, cJSON** pJson) {
79
  if (schemaRow == NULL || name == NULL || pColCmprRow == NULL || pJson == NULL) {
4,801✔
80
    uError("invalid parameter, schemaRow:%p, name:%p, pColCmprRow:%p, pJson:%p", schemaRow, name, pColCmprRow, pJson);
×
81
    return;
×
82
  }
83
  int32_t code = TSDB_CODE_SUCCESS;
4,801✔
84
  int8_t  buildDefaultCompress = 0;
4,801✔
85
  if (pColCmprRow->nCols <= 0) {
4,801✔
86
    buildDefaultCompress = 1;
72✔
87
  }
88

89
  char*  string = NULL;
4,801✔
90
  cJSON* json = cJSON_CreateObject();
4,801✔
91
  RAW_NULL_CHECK(json);
4,801✔
92
  cJSON* type = cJSON_CreateString("create");
4,801✔
93
  RAW_NULL_CHECK(type);
4,801✔
94

95
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
4,801✔
96
  cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super");
4,801✔
97
  RAW_NULL_CHECK(tableType);
4,801✔
98
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
4,801✔
99
  cJSON* tableName = cJSON_CreateString(name);
4,801✔
100
  RAW_NULL_CHECK(tableName);
4,801✔
101
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
4,801✔
102

103
  cJSON* columns = cJSON_CreateArray();
4,801✔
104
  RAW_NULL_CHECK(columns);
4,801✔
105
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "columns", columns));
4,801✔
106

107
  for (int i = 0; i < schemaRow->nCols; i++) {
31,972✔
108
    cJSON* column = cJSON_CreateObject();
27,171✔
109
    RAW_NULL_CHECK(column);
27,171✔
110
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(columns, column));
27,171✔
111
    SSchema* s = schemaRow->pSchema + i;
27,171✔
112
    cJSON*   cname = cJSON_CreateString(s->name);
27,171✔
113
    RAW_NULL_CHECK(cname);
27,171✔
114
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "name", cname));
27,171✔
115
    cJSON* ctype = cJSON_CreateNumber(s->type);
27,171✔
116
    RAW_NULL_CHECK(ctype);
27,171✔
117
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "type", ctype));
27,171✔
118
    if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) {
29,805✔
119
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
2,634✔
120
      cJSON*  cbytes = cJSON_CreateNumber(length);
2,634✔
121
      RAW_NULL_CHECK(cbytes);
2,634✔
122
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "length", cbytes));
2,634✔
123
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
24,537✔
124
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
408✔
125
      cJSON*  cbytes = cJSON_CreateNumber(length);
408✔
126
      RAW_NULL_CHECK(cbytes);
408✔
127
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "length", cbytes));
408✔
128
    } else if (IS_STR_DATA_BLOB(s->type)) {
24,129✔
129
      int32_t length = s->bytes - BLOBSTR_HEADER_SIZE;
×
130
      cJSON*  cbytes = cJSON_CreateNumber(length);
×
131
      RAW_NULL_CHECK(cbytes);
×
132
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "length", cbytes));
×
133
    } else if (s->type == TSDB_DATA_TYPE_DECIMAL || s->type == TSDB_DATA_TYPE_DECIMAL64) {
24,129✔
134
      int32_t length = pExtSchemas[i].typeMod;
199✔
135
      cJSON*  cbytes = cJSON_CreateNumber(length);
199✔
136
      RAW_NULL_CHECK(cbytes);
199✔
137
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "length", cbytes));
199✔
138
    }
139
    cJSON* isPk = cJSON_CreateBool(s->flags & COL_IS_KEY);
27,171✔
140
    RAW_NULL_CHECK(isPk);
27,171✔
141
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "isPrimarykey", isPk));
27,171✔
142

143
    if (pColCmprRow == NULL) {
27,171✔
144
      continue;
×
145
    }
146

147
    uint32_t alg = 0;
27,171✔
148
    if (buildDefaultCompress) {
27,171✔
149
      alg = createDefaultColCmprByType(s->type);
144✔
150
    } else {
151
      SColCmpr* pColCmpr = pColCmprRow->pColCmpr + i;
27,027✔
152
      alg = pColCmpr->alg;
27,027✔
153
    }
154
    const char* encode = columnEncodeStr(COMPRESS_L1_TYPE_U32(alg));
27,171✔
155
    RAW_NULL_CHECK(encode);
27,171✔
156
    const char* compress = columnCompressStr(COMPRESS_L2_TYPE_U32(alg));
27,171✔
157
    RAW_NULL_CHECK(compress);
27,171✔
158
    const char* level = columnLevelStr(COMPRESS_L2_TYPE_LEVEL_U32(alg));
27,171✔
159
    RAW_NULL_CHECK(level);
27,171✔
160

161
    cJSON* encodeJson = cJSON_CreateString(encode);
27,171✔
162
    RAW_NULL_CHECK(encodeJson);
27,171✔
163
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "encode", encodeJson));
27,171✔
164

165
    cJSON* compressJson = cJSON_CreateString(compress);
27,171✔
166
    RAW_NULL_CHECK(compressJson);
27,171✔
167
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "compress", compressJson));
27,171✔
168

169
    cJSON* levelJson = cJSON_CreateString(level);
27,171✔
170
    RAW_NULL_CHECK(levelJson);
27,171✔
171
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "level", levelJson));
27,171✔
172
  }
173

174
  cJSON* tags = cJSON_CreateArray();
4,801✔
175
  RAW_NULL_CHECK(tags);
4,801✔
176
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags));
4,801✔
177

178
  for (int i = 0; schemaTag && i < schemaTag->nCols; i++) {
13,804✔
179
    cJSON* tag = cJSON_CreateObject();
9,003✔
180
    RAW_NULL_CHECK(tag);
9,003✔
181
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag));
9,003✔
182
    SSchema* s = schemaTag->pSchema + i;
9,003✔
183
    cJSON*   tname = cJSON_CreateString(s->name);
9,003✔
184
    RAW_NULL_CHECK(tname);
9,003✔
185
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname));
9,003✔
186
    cJSON* ttype = cJSON_CreateNumber(s->type);
9,003✔
187
    RAW_NULL_CHECK(ttype);
9,003✔
188
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype));
9,003✔
189
    if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) {
9,592✔
190
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
589✔
191
      cJSON*  cbytes = cJSON_CreateNumber(length);
589✔
192
      RAW_NULL_CHECK(cbytes);
589✔
193
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "length", cbytes));
589✔
194
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
8,414✔
195
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
2,563✔
196
      cJSON*  cbytes = cJSON_CreateNumber(length);
2,563✔
197
      RAW_NULL_CHECK(cbytes);
2,563✔
198
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "length", cbytes));
2,563✔
199
    } else if (IS_STR_DATA_BLOB(s->type)) {
5,851✔
200
      int32_t length = s->bytes - BLOBSTR_HEADER_SIZE;
×
201
      cJSON*  cbytes = cJSON_CreateNumber(length);
×
202
      RAW_NULL_CHECK(cbytes);
×
203
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "length", cbytes));
×
204
    }
205
  }
206

207
end:
4,801✔
208
  *pJson = json;
4,801✔
209
}
210

211
static int32_t setCompressOption(cJSON* json, uint32_t para) {
×
212
  if (json == NULL) {
×
213
    return TSDB_CODE_INVALID_PARA;
×
214
  }
215
  uint8_t encode = COMPRESS_L1_TYPE_U32(para);
×
216
  int32_t code = 0;
×
217
  if (encode != 0) {
×
218
    const char* encodeStr = columnEncodeStr(encode);
×
219
    RAW_NULL_CHECK(encodeStr);
×
220
    cJSON* encodeJson = cJSON_CreateString(encodeStr);
×
221
    RAW_NULL_CHECK(encodeJson);
×
222
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "encode", encodeJson));
×
223
    return code;
×
224
  }
225
  uint8_t compress = COMPRESS_L2_TYPE_U32(para);
×
226
  if (compress != 0) {
×
227
    const char* compressStr = columnCompressStr(compress);
×
228
    RAW_NULL_CHECK(compressStr);
×
229
    cJSON* compressJson = cJSON_CreateString(compressStr);
×
230
    RAW_NULL_CHECK(compressJson);
×
231
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "compress", compressJson));
×
232
    return code;
×
233
  }
234
  uint8_t level = COMPRESS_L2_TYPE_LEVEL_U32(para);
×
235
  if (level != 0) {
×
236
    const char* levelStr = columnLevelStr(level);
×
237
    RAW_NULL_CHECK(levelStr);
×
238
    cJSON* levelJson = cJSON_CreateString(levelStr);
×
239
    RAW_NULL_CHECK(levelJson);
×
240
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "level", levelJson));
×
241
    return code;
×
242
  }
243

244
end:
×
245
  return code;
×
246
}
247
static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON** pJson) {
2,586✔
248
  if (alterData == NULL || pJson == NULL) {
2,586✔
NEW
249
    uError("invalid parameter in %s alterData:%p", __func__, alterData);
×
250
    return;
×
251
  }
252
  SMAlterStbReq req = {0};
2,586✔
253
  cJSON*        json = NULL;
2,586✔
254
  char*         string = NULL;
2,586✔
255
  int32_t       code = 0;
2,586✔
256

257
  if (tDeserializeSMAlterStbReq(alterData, alterDataLen, &req) != 0) {
2,586✔
258
    goto end;
×
259
  }
260

261
  json = cJSON_CreateObject();
2,586✔
262
  RAW_NULL_CHECK(json);
2,586✔
263
  cJSON* type = cJSON_CreateString("alter");
2,586✔
264
  RAW_NULL_CHECK(type);
2,586✔
265
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
2,586✔
266
  SName name = {0};
2,586✔
267
  RAW_RETURN_CHECK(tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
2,586✔
268
  cJSON* tableType = cJSON_CreateString("super");
2,586✔
269
  RAW_NULL_CHECK(tableType);
2,586✔
270
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
2,586✔
271
  cJSON* tableName = cJSON_CreateString(name.tname);
2,586✔
272
  RAW_NULL_CHECK(tableName);
2,586✔
273
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
2,586✔
274

275
  cJSON* alterType = cJSON_CreateNumber(req.alterType);
2,586✔
276
  RAW_NULL_CHECK(alterType);
2,586✔
277
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "alterType", alterType));
2,586✔
278
  switch (req.alterType) {
2,586✔
279
    case TSDB_ALTER_TABLE_ADD_TAG:
1,509✔
280
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
281
      if (taosArrayGetSize(req.pFields) != 1) {
1,509✔
NEW
282
        uError("invalid field num %" PRIzu " for alter type %d", taosArrayGetSize(req.pFields), req.alterType);
×
NEW
283
        cJSON_Delete(json);
×
NEW
284
        json = NULL;
×
NEW
285
        goto end;
×
286
      }
287
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
1,509✔
288
      RAW_NULL_CHECK(field);
1,509✔
289
      cJSON* colName = cJSON_CreateString(field->name);
1,509✔
290
      RAW_NULL_CHECK(colName);
1,509✔
291
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
1,509✔
292
      cJSON* colType = cJSON_CreateNumber(field->type);
1,509✔
293
      RAW_NULL_CHECK(colType);
1,509✔
294
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
1,509✔
295

296
      if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
1,509✔
297
          field->type == TSDB_DATA_TYPE_GEOMETRY) {
1,509✔
298
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
574✔
299
        cJSON*  cbytes = cJSON_CreateNumber(length);
574✔
300
        RAW_NULL_CHECK(cbytes);
574✔
301
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
574✔
302
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
935✔
303
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
71✔
304
        cJSON*  cbytes = cJSON_CreateNumber(length);
71✔
305
        RAW_NULL_CHECK(cbytes);
71✔
306
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
71✔
307
      } else if (IS_STR_DATA_BLOB(field->type)) {
864✔
UNCOV
308
        int32_t length = field->bytes - BLOBSTR_HEADER_SIZE;
×
UNCOV
309
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
UNCOV
310
        RAW_NULL_CHECK(cbytes);
×
UNCOV
311
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
312
      }
313
      break;
1,509✔
314
    }
315
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: {
×
316
      SFieldWithOptions* field = taosArrayGet(req.pFields, 0);
×
317
      RAW_NULL_CHECK(field);
×
UNCOV
318
      cJSON* colName = cJSON_CreateString(field->name);
×
UNCOV
319
      RAW_NULL_CHECK(colName);
×
UNCOV
320
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
321
      cJSON* colType = cJSON_CreateNumber(field->type);
×
322
      RAW_NULL_CHECK(colType);
×
323
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
×
324

325
      if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
×
326
          field->type == TSDB_DATA_TYPE_GEOMETRY) {
×
327
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
×
328
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
329
        RAW_NULL_CHECK(cbytes);
×
UNCOV
330
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
331
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
×
332
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
333
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
334
        RAW_NULL_CHECK(cbytes);
×
335
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
336
      } else if (IS_STR_DATA_BLOB(field->type)) {
×
337
        int32_t length = field->bytes - BLOBSTR_HEADER_SIZE;
×
338
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
339
        RAW_NULL_CHECK(cbytes);
×
340
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
341
      }
342

343
      RAW_RETURN_CHECK(setCompressOption(json, field->compress));
×
344
      break;
×
345
    }
346
    case TSDB_ALTER_TABLE_DROP_TAG:
432✔
347
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
348
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
432✔
349
      RAW_NULL_CHECK(field);
432✔
350
      cJSON* colName = cJSON_CreateString(field->name);
432✔
351
      RAW_NULL_CHECK(colName);
432✔
352
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
432✔
353
      break;
432✔
354
    }
355
    case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
645✔
356
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
357
      if (taosArrayGetSize(req.pFields) != 1) {
645✔
NEW
358
        uError("invalid field num %" PRIzu " for alter type %d", taosArrayGetSize(req.pFields), req.alterType);
×
NEW
359
        cJSON_Delete(json);
×
NEW
360
        json = NULL;
×
NEW
361
        goto end;
×
362
      }
363
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
645✔
364
      RAW_NULL_CHECK(field);
645✔
365
      cJSON* colName = cJSON_CreateString(field->name);
645✔
366
      RAW_NULL_CHECK(colName);
645✔
367
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
645✔
368
      cJSON* colType = cJSON_CreateNumber(field->type);
645✔
369
      RAW_NULL_CHECK(colType);
645✔
370
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
645✔
371
      if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
645✔
372
          field->type == TSDB_DATA_TYPE_GEOMETRY) {
645✔
373
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
574✔
374
        cJSON*  cbytes = cJSON_CreateNumber(length);
574✔
375
        RAW_NULL_CHECK(cbytes);
574✔
376
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
574✔
377
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
71✔
378
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
71✔
379
        cJSON*  cbytes = cJSON_CreateNumber(length);
71✔
380
        RAW_NULL_CHECK(cbytes);
71✔
381
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
71✔
382
      }
383
      break;
645✔
384
    }
UNCOV
385
    case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
×
386
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
UNCOV
387
      TAOS_FIELD* oldField = taosArrayGet(req.pFields, 0);
×
UNCOV
388
      RAW_NULL_CHECK(oldField);
×
UNCOV
389
      TAOS_FIELD* newField = taosArrayGet(req.pFields, 1);
×
UNCOV
390
      RAW_NULL_CHECK(newField);
×
UNCOV
391
      cJSON* colName = cJSON_CreateString(oldField->name);
×
UNCOV
392
      RAW_NULL_CHECK(colName);
×
UNCOV
393
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
UNCOV
394
      cJSON* colNewName = cJSON_CreateString(newField->name);
×
UNCOV
395
      RAW_NULL_CHECK(colNewName);
×
UNCOV
396
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colNewName", colNewName));
×
397
      break;
×
398
    }
399
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
×
400
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
×
401
      RAW_NULL_CHECK(field);
×
402
      cJSON* colName = cJSON_CreateString(field->name);
×
403
      RAW_NULL_CHECK(colName);
×
404
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
405
      RAW_RETURN_CHECK(setCompressOption(json, field->bytes));
×
406
      break;
×
407
    }
408
    default:
×
409
      break;
×
410
  }
411

412
end:
2,586✔
413
  tFreeSMAltertbReq(&req);
2,586✔
414
  *pJson = json;
2,586✔
415
}
416

417
static void processCreateStb(SMqMetaRsp* metaRsp, cJSON** pJson) {
3,775✔
418
  if (metaRsp == NULL || pJson == NULL) {
3,775✔
UNCOV
419
    uError("invalid parameter in %s", __func__);
×
420
    return;
×
421
  }
422
  SVCreateStbReq req = {0};
3,775✔
423
  SDecoder       coder = {0};
3,775✔
424

425
  uDebug("create stable data:%p", metaRsp);
3,775✔
426
  // decode and process req
427
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
3,775✔
428
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
3,775✔
429
  tDecoderInit(&coder, data, len);
3,775✔
430

431
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
3,775✔
432
    goto end;
×
433
  }
434
  buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.pExtSchemas, req.name, req.suid, TSDB_SUPER_TABLE, &req.colCmpr, pJson);
3,775✔
435

436
end:
3,775✔
437
  uDebug("create stable return");
3,775✔
438
  tDecoderClear(&coder);
3,775✔
439
}
440

441
static void processAlterStb(SMqMetaRsp* metaRsp, cJSON** pJson) {
2,586✔
442
  if (metaRsp == NULL || pJson == NULL) {
2,586✔
UNCOV
443
    uError("invalid parameter in %s", __func__);
×
444
    return;
×
445
  }
446
  SVCreateStbReq req = {0};
2,586✔
447
  SDecoder       coder = {0};
2,586✔
448
  uDebug("alter stable data:%p", metaRsp);
2,586✔
449

450
  // decode and process req
451
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
2,586✔
452
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
2,586✔
453
  tDecoderInit(&coder, data, len);
2,586✔
454

455
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
2,586✔
456
    goto end;
×
457
  }
458
  buildAlterSTableJson(req.alterOriData, req.alterOriDataLen, pJson);
2,586✔
459

460
end:
2,586✔
461
  uDebug("alter stable return");
2,586✔
462
  tDecoderClear(&coder);
2,586✔
463
}
464

465
static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
7,380✔
466
  if (json == NULL || pCreateReq == NULL) {
7,380✔
UNCOV
467
    uError("invalid parameter in %s", __func__);
×
468
    return;
×
469
  }
470
  STag*   pTag = (STag*)pCreateReq->ctb.pTag;
7,380✔
471
  char*   sname = pCreateReq->ctb.stbName;
7,380✔
472
  char*   name = pCreateReq->name;
7,380✔
473
  SArray* tagName = pCreateReq->ctb.tagName;
7,380✔
474
  int64_t id = pCreateReq->uid;
7,380✔
475
  uint8_t tagNum = pCreateReq->ctb.tagNum;
7,380✔
476
  int32_t code = 0;
7,380✔
477
  SArray* pTagVals = NULL;
7,380✔
478
  char*   pJson = NULL;
7,380✔
479

480
  cJSON* tableName = cJSON_CreateString(name);
7,380✔
481
  RAW_NULL_CHECK(tableName);
7,380✔
482
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
7,380✔
483
  cJSON* using = cJSON_CreateString(sname);
7,380✔
484
  RAW_NULL_CHECK(using);
7,380✔
485
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "using", using));
7,380✔
486
  cJSON* tagNumJson = cJSON_CreateNumber(tagNum);
7,380✔
487
  RAW_NULL_CHECK(tagNumJson);
7,380✔
488
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tagNum", tagNumJson));
7,380✔
489

490
  cJSON* tags = cJSON_CreateArray();
7,380✔
491
  RAW_NULL_CHECK(tags);
7,380✔
492
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags));
7,380✔
493
  RAW_RETURN_CHECK(tTagToValArray(pTag, &pTagVals));
7,380✔
494
  if (tTagIsJson(pTag)) {
7,380✔
495
    STag* p = (STag*)pTag;
948✔
496
    if (p->nTag == 0) {
948✔
497
      uError("p->nTag == 0");
474✔
498
      goto end;
474✔
499
    }
500
    parseTagDatatoJson(pTag, &pJson, NULL);
474✔
501
    RAW_NULL_CHECK(pJson);
474✔
502
    cJSON* tag = cJSON_CreateObject();
474✔
503
    RAW_NULL_CHECK(tag);
474✔
504
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag));
474✔
505
    STagVal* pTagVal = taosArrayGet(pTagVals, 0);
474✔
506
    RAW_NULL_CHECK(pTagVal);
474✔
507
    char* ptname = taosArrayGet(tagName, 0);
474✔
508
    RAW_NULL_CHECK(ptname);
474✔
509
    cJSON* tname = cJSON_CreateString(ptname);
474✔
510
    RAW_NULL_CHECK(tname);
474✔
511
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname));
474✔
512
    cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON);
474✔
513
    RAW_NULL_CHECK(ttype);
474✔
514
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype));
474✔
515
    cJSON* tvalue = cJSON_CreateString(pJson);
474✔
516
    RAW_NULL_CHECK(tvalue);
474✔
517
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "value", tvalue));
474✔
518
    goto end;
474✔
519
  }
520

521
  for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
21,879✔
522
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
15,447✔
523
    RAW_NULL_CHECK(pTagVal);
15,447✔
524
    cJSON* tag = cJSON_CreateObject();
15,447✔
525
    RAW_NULL_CHECK(tag);
15,447✔
526
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag));
15,447✔
527
    char* ptname = taosArrayGet(tagName, i);
15,447✔
528
    RAW_NULL_CHECK(ptname);
15,447✔
529
    cJSON* tname = cJSON_CreateString(ptname);
15,447✔
530
    RAW_NULL_CHECK(tname);
15,447✔
531
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname));
15,447✔
532
    cJSON* ttype = cJSON_CreateNumber(pTagVal->type);
15,447✔
533
    RAW_NULL_CHECK(ttype);
15,447✔
534
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype));
15,447✔
535

536
    cJSON* tvalue = NULL;
15,447✔
537
    if (IS_VAR_DATA_TYPE(pTagVal->type)) {
20,608✔
538
      if (IS_STR_DATA_BLOB(pTagVal->type)) {
5,161✔
UNCOV
539
        goto end;
×
540
      }
541
      int64_t bufSize = 0;
5,161✔
542
      if (pTagVal->type == TSDB_DATA_TYPE_VARBINARY) {
5,161✔
UNCOV
543
        bufSize = pTagVal->nData * 2 + 2 + 3;
×
544
      } else {
545
        bufSize = pTagVal->nData + 3;
5,161✔
546
      }
547
      char* buf = taosMemoryCalloc(bufSize, 1);
5,161✔
548
      RAW_NULL_CHECK(buf);
5,161✔
549
      if (dataConverToStr(buf, bufSize, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL) != TSDB_CODE_SUCCESS) {
5,161✔
UNCOV
550
        taosMemoryFree(buf);
×
551
        goto end;
×
552
      }
553

554
      tvalue = cJSON_CreateString(buf);
5,161✔
555
      taosMemoryFree(buf);
5,161✔
556
      RAW_NULL_CHECK(tvalue);
5,161✔
557
    } else {
558
      double val = 0;
10,286✔
559
      GET_TYPED_DATA(val, double, pTagVal->type, &pTagVal->i64,
10,286✔
560
                     0);  // currently tag type can't be decimal, so pass 0 as typeMod
561
      tvalue = cJSON_CreateNumber(val);
10,286✔
562
      RAW_NULL_CHECK(tvalue);
10,286✔
563
    }
564

565
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "value", tvalue));
15,447✔
566
  }
567

568
end:
7,380✔
569
  taosMemoryFree(pJson);
7,380✔
570
  taosArrayDestroy(pTagVals);
7,380✔
571
}
572

573
static void buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs, cJSON** pJson) {
5,670✔
574
  if (pJson == NULL || pCreateReq == NULL) {
5,670✔
UNCOV
575
    uError("invalid parameter in %s", __func__);
×
UNCOV
576
    return;
×
577
  }
578
  int32_t code = 0;
5,670✔
579
  char*   string = NULL;
5,670✔
580
  cJSON*  json = cJSON_CreateObject();
5,670✔
581
  RAW_NULL_CHECK(json);
5,670✔
582
  cJSON* type = cJSON_CreateString("create");
5,670✔
583
  RAW_NULL_CHECK(type);
5,670✔
584
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
5,670✔
585

586
  cJSON* tableType = cJSON_CreateString("child");
5,670✔
587
  RAW_NULL_CHECK(tableType);
5,670✔
588
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
5,670✔
589

590
  buildChildElement(json, pCreateReq);
5,670✔
591
  cJSON* createList = cJSON_CreateArray();
5,670✔
592
  RAW_NULL_CHECK(createList);
5,670✔
593
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "createList", createList));
5,670✔
594

595
  for (int i = 0; nReqs > 1 && i < nReqs; i++) {
7,380✔
596
    cJSON* create = cJSON_CreateObject();
1,710✔
597
    RAW_NULL_CHECK(create);
1,710✔
598
    buildChildElement(create, pCreateReq + i);
1,710✔
599
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(createList, create));
1,710✔
600
  }
601

602
end:
5,670✔
603
  *pJson = json;
5,670✔
604
}
605

606
static void processCreateTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
6,301✔
607
  if (pJson == NULL || metaRsp == NULL) {
6,301✔
UNCOV
608
    uError("invalid parameter in %s", __func__);
×
UNCOV
609
    return;
×
610
  }
611
  SDecoder           decoder = {0};
6,301✔
612
  SVCreateTbBatchReq req = {0};
6,301✔
613
  SVCreateTbReq*     pCreateReq;
614
  // decode
615
  uDebug("create table data:%p", metaRsp);
6,301✔
616
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
6,301✔
617
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
6,301✔
618
  tDecoderInit(&decoder, data, len);
6,301✔
619
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
6,301✔
620
    goto end;
×
621
  }
622

623
  // loop to create table
624
  if (req.nReqs > 0) {
6,301✔
625
    pCreateReq = req.pReqs;
6,301✔
626
    if (pCreateReq->type == TSDB_CHILD_TABLE) {
6,301✔
627
      buildCreateCTableJson(req.pReqs, req.nReqs, pJson);
5,347✔
628
    } else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
954✔
629
      buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->pExtSchemas, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE,
954✔
630
                           &pCreateReq->colCmpr, pJson);
631
    }
632
  }
633

634
end:
6,301✔
635
  uDebug("create table return");
6,301✔
636
  tDeleteSVCreateTbBatchReq(&req);
6,301✔
637
  tDecoderClear(&decoder);
6,301✔
638
}
639

640
static void processAutoCreateTable(SMqDataRsp* rsp, char** string) {
395✔
641
  if (rsp == NULL || string == NULL) {
395✔
UNCOV
642
    uError("invalid parameter in %s", __func__);
×
UNCOV
643
    return;
×
644
  }
645
  SDecoder*      decoder = NULL;
395✔
646
  SVCreateTbReq* pCreateReq = NULL;
395✔
647
  int32_t        code = 0;
395✔
648
  uDebug("auto create table data:%p", rsp);
395✔
649
  if (rsp->createTableNum <= 0) {
395✔
UNCOV
650
    uError("processAutoCreateTable rsp->createTableNum <= 0");
×
UNCOV
651
    goto end;
×
652
  }
653

654
  decoder = taosMemoryCalloc(rsp->createTableNum, sizeof(SDecoder));
395✔
655
  RAW_NULL_CHECK(decoder);
395✔
656
  pCreateReq = taosMemoryCalloc(rsp->createTableNum, sizeof(SVCreateTbReq));
395✔
657
  RAW_NULL_CHECK(pCreateReq);
395✔
658

659
  // loop to create table
660
  for (int32_t iReq = 0; iReq < rsp->createTableNum; iReq++) {
1,194✔
661
    // decode
662
    void** data = taosArrayGet(rsp->createTableReq, iReq);
799✔
663
    RAW_NULL_CHECK(data);
799✔
664
    int32_t* len = taosArrayGet(rsp->createTableLen, iReq);
799✔
665
    RAW_NULL_CHECK(len);
799✔
666
    tDecoderInit(&decoder[iReq], *data, *len);
799✔
667
    if (tDecodeSVCreateTbReq(&decoder[iReq], pCreateReq + iReq) < 0) {
799✔
UNCOV
668
      goto end;
×
669
    }
670

671
    if (pCreateReq[iReq].type != TSDB_CHILD_TABLE && pCreateReq[iReq].type != TSDB_NORMAL_TABLE) {
799✔
UNCOV
672
      uError("processAutoCreateTable pCreateReq[iReq].type != TSDB_CHILD_TABLE");
×
UNCOV
673
      goto end;
×
674
    }
675
  }
676
  cJSON* pJson = NULL;
395✔
677
  if (pCreateReq->type == TSDB_NORMAL_TABLE) {
395✔
678
    buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->pExtSchemas, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE,
72✔
679
                         &pCreateReq->colCmpr, &pJson);
680
  } else if (pCreateReq->type == TSDB_CHILD_TABLE) {
323✔
681
    buildCreateCTableJson(pCreateReq, rsp->createTableNum, &pJson);
323✔
682
  }
683

684
  *string = cJSON_PrintUnformatted(pJson);
395✔
685
  cJSON_Delete(pJson);
395✔
686

687
end:
395✔
688
  uDebug("auto created table return, sql json:%s", *string);
395✔
689
  for (int i = 0; decoder && pCreateReq && i < rsp->createTableNum; i++) {
1,194✔
690
    tDecoderClear(&decoder[i]);
799✔
691
    taosMemoryFreeClear(pCreateReq[i].comment);
799✔
692
    if (pCreateReq[i].type == TSDB_CHILD_TABLE) {
799✔
693
      taosArrayDestroy(pCreateReq[i].ctb.tagName);
727✔
694
    }
695
  }
696
  taosMemoryFree(decoder);
395✔
697
  taosMemoryFree(pCreateReq);
395✔
698
}
699

700
static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
1,487✔
701
  if (pJson == NULL || metaRsp == NULL) {
1,487✔
UNCOV
702
    uError("invalid parameter in %s", __func__);
×
UNCOV
703
    return;
×
704
  }
705
  SDecoder     decoder = {0};
1,487✔
706
  SVAlterTbReq vAlterTbReq = {0};
1,487✔
707
  char*        string = NULL;
1,487✔
708
  cJSON*       json = NULL;
1,487✔
709
  int32_t      code = 0;
1,487✔
710

711
  uDebug("alter table data:%p", metaRsp);
1,487✔
712
  // decode
713
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
1,487✔
714
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
1,487✔
715
  tDecoderInit(&decoder, data, len);
1,487✔
716
  if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) {
1,487✔
UNCOV
717
    uError("tDecodeSVAlterTbReq error");
×
UNCOV
718
    goto end;
×
719
  }
720

721
  json = cJSON_CreateObject();
1,487✔
722
  RAW_NULL_CHECK(json);
1,487✔
723
  cJSON* type = cJSON_CreateString("alter");
1,487✔
724
  RAW_NULL_CHECK(type);
1,487✔
725
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
1,487✔
726
  cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ||
2,677✔
727
                                                vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL
1,190✔
728
                                            ? "child"
729
                                            : "normal");
730
  RAW_NULL_CHECK(tableType);
1,487✔
731
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
1,487✔
732
  cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
1,487✔
733
  RAW_NULL_CHECK(tableName);
1,487✔
734
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
1,487✔
735
  cJSON* alterType = cJSON_CreateNumber(vAlterTbReq.action);
1,487✔
736
  RAW_NULL_CHECK(alterType);
1,487✔
737
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "alterType", alterType));
1,487✔
738

739
  switch (vAlterTbReq.action) {
1,487✔
740
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
238✔
741
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
238✔
742
      RAW_NULL_CHECK(colName);
238✔
743
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
238✔
744
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
238✔
745
      RAW_NULL_CHECK(colType);
238✔
746
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
238✔
747

748
      if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY ||
238✔
749
          vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) {
238✔
UNCOV
750
        int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
×
UNCOV
751
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
UNCOV
752
        RAW_NULL_CHECK(cbytes);
×
UNCOV
753
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
754
      } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
238✔
UNCOV
755
        int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
UNCOV
756
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
UNCOV
757
        RAW_NULL_CHECK(cbytes);
×
UNCOV
758
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
759
      }
760
      break;
238✔
761
    }
762
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: {
×
763
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
×
764
      RAW_NULL_CHECK(colName);
×
765
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
UNCOV
766
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
×
767
      RAW_NULL_CHECK(colType);
×
768
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
×
769

770
      if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY ||
×
UNCOV
771
          vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) {
×
UNCOV
772
        int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
×
UNCOV
773
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
774
        RAW_NULL_CHECK(cbytes);
×
775
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
776
      } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
×
777
        int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
778
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
779
        RAW_NULL_CHECK(cbytes);
×
780
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
781
      }
782
      RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
×
783
      break;
×
784
    }
785
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
238✔
786
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
238✔
787
      RAW_NULL_CHECK(colName);
238✔
788
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
238✔
789
      break;
238✔
790
    }
791
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
238✔
792
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
238✔
793
      RAW_NULL_CHECK(colName);
238✔
794
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
238✔
795
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.colModType);
238✔
796
      RAW_NULL_CHECK(colType);
238✔
797
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
238✔
798
      if (vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY || vAlterTbReq.colModType == TSDB_DATA_TYPE_VARBINARY ||
238✔
799
          vAlterTbReq.colModType == TSDB_DATA_TYPE_GEOMETRY) {
238✔
UNCOV
800
        int32_t length = vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE;
×
UNCOV
801
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
UNCOV
802
        RAW_NULL_CHECK(cbytes);
×
UNCOV
803
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
804
      } else if (vAlterTbReq.colModType == TSDB_DATA_TYPE_NCHAR) {
238✔
805
        int32_t length = (vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
238✔
806
        cJSON*  cbytes = cJSON_CreateNumber(length);
238✔
807
        RAW_NULL_CHECK(cbytes);
238✔
808
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
238✔
809
      }
810
      break;
238✔
811
    }
812
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
238✔
813
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
238✔
814
      RAW_NULL_CHECK(colName);
238✔
815
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
238✔
816
      cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName);
238✔
817
      RAW_NULL_CHECK(colNewName);
238✔
818
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colNewName", colNewName));
238✔
819
      break;
238✔
820
    }
821
    case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: {
297✔
822
      cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName);
297✔
823
      RAW_NULL_CHECK(tagName);
297✔
824
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", tagName));
297✔
825

826
      bool isNull = vAlterTbReq.isNull;
297✔
827
      if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
297✔
UNCOV
828
        STag* jsonTag = (STag*)vAlterTbReq.pTagVal;
×
UNCOV
829
        if (jsonTag->nTag == 0) isNull = true;
×
830
      }
831
      if (!isNull) {
297✔
832
        char* buf = NULL;
297✔
833

834
        if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
297✔
UNCOV
835
          if (!tTagIsJson(vAlterTbReq.pTagVal)) {
×
UNCOV
836
            uError("processAlterTable isJson false");
×
UNCOV
837
            goto end;
×
838
          }
UNCOV
839
          parseTagDatatoJson(vAlterTbReq.pTagVal, &buf, NULL);
×
840
          if (buf == NULL) {
×
841
            uError("parseTagDatatoJson failed, buf == NULL");
×
UNCOV
842
            goto end;
×
843
          }
844
        } else {
845
          int64_t bufSize = 0;
297✔
846
          if (vAlterTbReq.tagType == TSDB_DATA_TYPE_VARBINARY) {
297✔
847
            bufSize = vAlterTbReq.nTagVal * 2 + 2 + 3;
×
848
          } else {
849
            bufSize = vAlterTbReq.nTagVal + 32;
297✔
850
          }
851
          buf = taosMemoryCalloc(bufSize, 1);
297✔
852
          RAW_NULL_CHECK(buf);
297✔
853
          if (dataConverToStr(buf, bufSize, vAlterTbReq.tagType, vAlterTbReq.pTagVal, vAlterTbReq.nTagVal, NULL) !=
297✔
854
              TSDB_CODE_SUCCESS) {
UNCOV
855
            taosMemoryFree(buf);
×
UNCOV
856
            goto end;
×
857
          }
858
        }
859

860
        cJSON* colValue = cJSON_CreateString(buf);
297✔
861
        taosMemoryFree(buf);
297✔
862
        RAW_NULL_CHECK(colValue);
297✔
863
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colValue", colValue));
297✔
864
      }
865

866
      cJSON* isNullCJson = cJSON_CreateBool(isNull);
297✔
867
      RAW_NULL_CHECK(isNullCJson);
297✔
868
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colValueNull", isNullCJson));
297✔
869
      break;
297✔
870
    }
UNCOV
871
    case TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL: {
×
UNCOV
872
      int32_t nTags = taosArrayGetSize(vAlterTbReq.pMultiTag);
×
UNCOV
873
      if (nTags <= 0) {
×
UNCOV
874
        uError("processAlterTable parse multi tags error");
×
UNCOV
875
        goto end;
×
876
      }
877

UNCOV
878
      cJSON* tags = cJSON_CreateArray();
×
UNCOV
879
      RAW_NULL_CHECK(tags);
×
UNCOV
880
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags));
×
881

UNCOV
882
      for (int32_t i = 0; i < nTags; i++) {
×
883
        cJSON* member = cJSON_CreateObject();
×
884
        RAW_NULL_CHECK(member);
×
885
        RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, member));
×
886

887
        SMultiTagUpateVal* pTagVal = taosArrayGet(vAlterTbReq.pMultiTag, i);
×
UNCOV
888
        cJSON*             tagName = cJSON_CreateString(pTagVal->tagName);
×
UNCOV
889
        RAW_NULL_CHECK(tagName);
×
890
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colName", tagName));
×
891

892
        if (pTagVal->tagType == TSDB_DATA_TYPE_JSON) {
×
UNCOV
893
          uError("processAlterTable isJson false");
×
894
          goto end;
×
895
        }
896
        bool isNull = pTagVal->isNull;
×
897
        if (!isNull) {
×
UNCOV
898
          int64_t bufSize = 0;
×
899
          if (pTagVal->tagType == TSDB_DATA_TYPE_VARBINARY) {
×
900
            bufSize = pTagVal->nTagVal * 2 + 2 + 3;
×
901
          } else {
902
            bufSize = pTagVal->nTagVal + 3;
×
903
          }
904
          char* buf = taosMemoryCalloc(bufSize, 1);
×
905
          RAW_NULL_CHECK(buf);
×
906
          if (dataConverToStr(buf, bufSize, pTagVal->tagType, pTagVal->pTagVal, pTagVal->nTagVal, NULL) !=
×
907
              TSDB_CODE_SUCCESS) {
908
            taosMemoryFree(buf);
×
909
            goto end;
×
910
          }
911
          cJSON* colValue = cJSON_CreateString(buf);
×
912
          taosMemoryFree(buf);
×
UNCOV
913
          RAW_NULL_CHECK(colValue);
×
914
          RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colValue", colValue));
×
915
        }
916
        cJSON* isNullCJson = cJSON_CreateBool(isNull);
×
917
        RAW_NULL_CHECK(isNullCJson);
×
918
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colValueNull", isNullCJson));
×
919
      }
920
      break;
×
921
    }
922

923
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
×
924
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
×
925
      RAW_NULL_CHECK(colName);
×
926
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
UNCOV
927
      RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
×
928
      break;
×
929
    }
930
    default:
238✔
931
      break;
238✔
932
  }
933

934
end:
1,487✔
935
  uDebug("alter table return");
1,487✔
936
  if (vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL) {
1,487✔
937
    taosArrayDestroy(vAlterTbReq.pMultiTag);
×
938
  }
939
  tDecoderClear(&decoder);
1,487✔
940
  *pJson = json;
1,487✔
941
}
942

943
static void processDropSTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
400✔
944
  if (pJson == NULL || metaRsp == NULL) {
400✔
UNCOV
945
    uError("invalid parameter in %s", __func__);
×
UNCOV
946
    return;
×
947
  }
948
  SDecoder     decoder = {0};
400✔
949
  SVDropStbReq req = {0};
400✔
950
  cJSON*       json = NULL;
400✔
951
  int32_t      code = 0;
400✔
952

953
  uDebug("processDropSTable data:%p", metaRsp);
400✔
954

955
  // decode
956
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
400✔
957
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
400✔
958
  tDecoderInit(&decoder, data, len);
400✔
959
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
400✔
UNCOV
960
    uError("tDecodeSVDropStbReq failed");
×
UNCOV
961
    goto end;
×
962
  }
963

964
  json = cJSON_CreateObject();
400✔
965
  RAW_NULL_CHECK(json);
400✔
966
  cJSON* type = cJSON_CreateString("drop");
400✔
967
  RAW_NULL_CHECK(type);
400✔
968
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
400✔
969
  cJSON* tableType = cJSON_CreateString("super");
400✔
970
  RAW_NULL_CHECK(tableType);
400✔
971
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
400✔
972
  cJSON* tableName = cJSON_CreateString(req.name);
400✔
973
  RAW_NULL_CHECK(tableName);
400✔
974
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
400✔
975

976
end:
400✔
977
  uDebug("processDropSTable return");
400✔
978
  tDecoderClear(&decoder);
400✔
979
  *pJson = json;
400✔
980
}
981
static void processDeleteTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
167✔
982
  if (pJson == NULL || metaRsp == NULL) {
167✔
UNCOV
983
    uError("invalid parameter in %s", __func__);
×
UNCOV
984
    return;
×
985
  }
986
  SDeleteRes req = {0};
167✔
987
  SDecoder   coder = {0};
167✔
988
  cJSON*     json = NULL;
167✔
989
  int32_t    code = 0;
167✔
990

991
  uDebug("processDeleteTable data:%p", metaRsp);
167✔
992
  // decode and process req
993
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
167✔
994
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
167✔
995

996
  tDecoderInit(&coder, data, len);
167✔
997
  if (tDecodeDeleteRes(&coder, &req) < 0) {
167✔
UNCOV
998
    uError("tDecodeDeleteRes failed");
×
UNCOV
999
    goto end;
×
1000
  }
1001

1002
  //  getTbName(req.tableFName);
1003
  char sql[256] = {0};
167✔
1004
  (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
167✔
1005
                 req.tsColName, req.skey, req.tsColName, req.ekey);
1006

1007
  json = cJSON_CreateObject();
167✔
1008
  RAW_NULL_CHECK(json);
167✔
1009
  cJSON* type = cJSON_CreateString("delete");
167✔
1010
  RAW_NULL_CHECK(type);
167✔
1011
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
167✔
1012
  cJSON* sqlJson = cJSON_CreateString(sql);
167✔
1013
  RAW_NULL_CHECK(sqlJson);
167✔
1014
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "sql", sqlJson));
167✔
1015

1016
end:
167✔
1017
  uDebug("processDeleteTable return");
167✔
1018
  tDecoderClear(&coder);
167✔
1019
  *pJson = json;
167✔
1020
}
1021

1022
static void processDropTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
271✔
1023
  if (pJson == NULL || metaRsp == NULL) {
271✔
UNCOV
1024
    uError("invalid parameter in %s", __func__);
×
UNCOV
1025
    return;
×
1026
  }
1027
  SDecoder         decoder = {0};
271✔
1028
  SVDropTbBatchReq req = {0};
271✔
1029
  cJSON*           json = NULL;
271✔
1030
  int32_t          code = 0;
271✔
1031

1032
  uDebug("processDropTable data:%p", metaRsp);
271✔
1033
  // decode
1034
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
271✔
1035
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
271✔
1036
  tDecoderInit(&decoder, data, len);
271✔
1037
  if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) {
271✔
UNCOV
1038
    uError("tDecodeSVDropTbBatchReq failed");
×
UNCOV
1039
    goto end;
×
1040
  }
1041

1042
  json = cJSON_CreateObject();
271✔
1043
  RAW_NULL_CHECK(json);
271✔
1044
  cJSON* type = cJSON_CreateString("drop");
271✔
1045
  RAW_NULL_CHECK(type);
271✔
1046
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
271✔
1047
  cJSON* tableNameList = cJSON_CreateArray();
271✔
1048
  RAW_NULL_CHECK(tableNameList);
271✔
1049
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableNameList", tableNameList));
271✔
1050

1051
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
613✔
1052
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
342✔
1053
    cJSON*       tableName = cJSON_CreateString(pDropTbReq->name);
342✔
1054
    RAW_NULL_CHECK(tableName);
342✔
1055
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tableNameList, tableName));
342✔
1056
  }
1057

1058
end:
271✔
1059
  uDebug("processDropTable return");
271✔
1060
  tDecoderClear(&decoder);
271✔
1061
  *pJson = json;
271✔
1062
}
1063

1064
static int32_t taosCreateStb(TAOS* taos, void* meta, uint32_t metaLen) {
5,748✔
1065
  if (taos == NULL || meta == NULL) {
5,748✔
UNCOV
1066
    uError("invalid parameter in %s", __func__);
×
UNCOV
1067
    return TSDB_CODE_INVALID_PARA;
×
1068
  }
1069
  SVCreateStbReq req = {0};
5,748✔
1070
  SDecoder       coder = {0};
5,748✔
1071
  SMCreateStbReq pReq = {0};
5,748✔
1072
  int32_t        code = TSDB_CODE_SUCCESS;
5,748✔
1073
  SRequestObj*   pRequest = NULL;
5,748✔
1074

1075
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
5,748✔
1076
  uDebug(LOG_ID_TAG " create stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
5,748✔
1077
  pRequest->syncQuery = true;
5,748✔
1078
  if (!pRequest->pDb) {
5,748✔
1079
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
1080
    goto end;
×
1081
  }
1082
  // decode and process req
1083
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
5,748✔
1084
  uint32_t len = metaLen - sizeof(SMsgHead);
5,748✔
1085
  tDecoderInit(&coder, data, len);
5,748✔
1086
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
5,748✔
UNCOV
1087
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
1088
    goto end;
×
1089
  }
1090

1091
  int8_t           createDefaultCompress = 0;
5,748✔
1092
  SColCmprWrapper* p = &req.colCmpr;
5,748✔
1093
  if (p->nCols == 0) {
5,748✔
UNCOV
1094
    createDefaultCompress = 1;
×
1095
  }
1096
  // build create stable
1097
  pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SFieldWithOptions));
5,748✔
1098
  RAW_NULL_CHECK(pReq.pColumns);
5,748✔
1099
  for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
39,908✔
1100
    SSchema*          pSchema = req.schemaRow.pSchema + i;
34,160✔
1101
    SFieldWithOptions field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
34,160✔
1102
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
34,160✔
1103

1104
    if (createDefaultCompress) {
34,160✔
UNCOV
1105
      field.compress = createDefaultColCmprByType(pSchema->type);
×
1106
    } else {
1107
      SColCmpr* pCmp = &req.colCmpr.pColCmpr[i];
34,160✔
1108
      field.compress = pCmp->alg;
34,160✔
1109
    }
1110
    if (req.pExtSchemas) field.typeMod = req.pExtSchemas[i].typeMod;
34,160✔
1111
    RAW_NULL_CHECK(taosArrayPush(pReq.pColumns, &field));
68,320✔
1112
  }
1113
  pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
5,748✔
1114
  RAW_NULL_CHECK(pReq.pTags);
5,748✔
1115
  for (int32_t i = 0; i < req.schemaTag.nCols; i++) {
21,476✔
1116
    SSchema* pSchema = req.schemaTag.pSchema + i;
15,728✔
1117
    SField   field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
15,728✔
1118
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
15,728✔
1119
    RAW_NULL_CHECK(taosArrayPush(pReq.pTags, &field));
31,456✔
1120
  }
1121

1122
  pReq.colVer = req.schemaRow.version;
5,748✔
1123
  pReq.tagVer = req.schemaTag.version;
5,748✔
1124
  pReq.numOfColumns = req.schemaRow.nCols;
5,748✔
1125
  pReq.numOfTags = req.schemaTag.nCols;
5,748✔
1126
  pReq.commentLen = -1;
5,748✔
1127
  pReq.suid = processSuid(req.suid, pRequest->pDb);
5,748✔
1128
  pReq.source = TD_REQ_FROM_TAOX;
5,748✔
1129
  pReq.igExists = true;
5,748✔
1130

1131
  uDebug(LOG_ID_TAG " create stable name:%s suid:%" PRId64 " processSuid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
5,748✔
1132
         pReq.suid);
1133
  STscObj* pTscObj = pRequest->pTscObj;
5,748✔
1134
  SName    tableName = {0};
5,748✔
1135
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
5,748✔
1136
  RAW_RETURN_CHECK(tNameExtractFullName(&tableName, pReq.name));
5,748✔
1137
  SCmdMsgInfo pCmdMsg = {0};
5,748✔
1138
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
5,748✔
1139
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
5,748✔
1140
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
5,748✔
1141
  if (pCmdMsg.msgLen <= 0) {
5,748✔
UNCOV
1142
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
1143
    goto end;
×
1144
  }
1145
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
5,748✔
1146
  RAW_NULL_CHECK(pCmdMsg.pMsg);
5,748✔
1147
  if (tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) <= 0) {
5,748✔
UNCOV
1148
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
1149
    taosMemoryFree(pCmdMsg.pMsg);
×
UNCOV
1150
    goto end;
×
1151
  }
1152

1153
  SQuery pQuery = {0};
5,748✔
1154
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
5,748✔
1155
  pQuery.pCmdMsg = &pCmdMsg;
5,748✔
1156
  pQuery.msgType = pQuery.pCmdMsg->msgType;
5,748✔
1157
  pQuery.stableQuery = true;
5,748✔
1158

1159
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
5,748✔
1160

1161
  taosMemoryFree(pCmdMsg.pMsg);
5,748✔
1162

1163
  if (pRequest->code == TSDB_CODE_SUCCESS) {
5,748✔
1164
    SCatalog* pCatalog = NULL;
5,748✔
1165
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
5,748✔
1166
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
5,748✔
1167
  }
1168

1169
  code = pRequest->code;
5,748✔
1170

1171
end:
5,748✔
1172
  uDebug(LOG_ID_TAG " create stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
5,748✔
1173
  destroyRequest(pRequest);
5,748✔
1174
  tFreeSMCreateStbReq(&pReq);
5,748✔
1175
  tDecoderClear(&coder);
5,748✔
1176
  return code;
5,748✔
1177
}
1178

1179
static int32_t taosDropStb(TAOS* taos, void* meta, uint32_t metaLen) {
400✔
1180
  if (taos == NULL || meta == NULL) {
400✔
UNCOV
1181
    uError("invalid parameter in %s", __func__);
×
UNCOV
1182
    return TSDB_CODE_INVALID_PARA;
×
1183
  }
1184
  SVDropStbReq req = {0};
400✔
1185
  SDecoder     coder = {0};
400✔
1186
  SMDropStbReq pReq = {0};
400✔
1187
  int32_t      code = TSDB_CODE_SUCCESS;
400✔
1188
  SRequestObj* pRequest = NULL;
400✔
1189

1190
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
400✔
1191
  uDebug(LOG_ID_TAG " drop stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
400✔
1192
  pRequest->syncQuery = true;
400✔
1193
  if (!pRequest->pDb) {
400✔
1194
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
1195
    goto end;
×
1196
  }
1197
  // decode and process req
1198
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
400✔
1199
  uint32_t len = metaLen - sizeof(SMsgHead);
400✔
1200
  tDecoderInit(&coder, data, len);
400✔
1201
  if (tDecodeSVDropStbReq(&coder, &req) < 0) {
400✔
UNCOV
1202
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
1203
    goto end;
×
1204
  }
1205

1206
  SCatalog* pCatalog = NULL;
400✔
1207
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
400✔
1208
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
400✔
1209
                           .requestId = pRequest->requestId,
400✔
1210
                           .requestObjRefId = pRequest->self,
400✔
1211
                           .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
400✔
1212
  SName            pName = {0};
400✔
1213
  toName(pRequest->pTscObj->acctId, pRequest->pDb, req.name, &pName);
400✔
1214
  STableMeta* pTableMeta = NULL;
400✔
1215
  code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
400✔
1216
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
400✔
1217
    code = TSDB_CODE_SUCCESS;
86✔
1218
    taosMemoryFreeClear(pTableMeta);
86✔
1219
    goto end;
86✔
1220
  }
1221
  if (code != TSDB_CODE_SUCCESS) {
314✔
UNCOV
1222
    goto end;
×
1223
  }
1224
  pReq.suid = pTableMeta->uid;
314✔
1225
  taosMemoryFreeClear(pTableMeta);
314✔
1226

1227
  // build drop stable
1228
  pReq.igNotExists = true;
314✔
1229
  pReq.source = TD_REQ_FROM_TAOX;
314✔
1230
  //  pReq.suid = processSuid(req.suid, pRequest->pDb);
1231

1232
  uDebug(LOG_ID_TAG " drop stable name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
314✔
1233
         pReq.suid);
1234
  STscObj* pTscObj = pRequest->pTscObj;
314✔
1235
  SName    tableName = {0};
314✔
1236
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
314✔
1237
  if (tNameExtractFullName(&tableName, pReq.name) != 0) {
314✔
UNCOV
1238
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
1239
    goto end;
×
1240
  }
1241

1242
  SCmdMsgInfo pCmdMsg = {0};
314✔
1243
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
314✔
1244
  pCmdMsg.msgType = TDMT_MND_DROP_STB;
314✔
1245
  pCmdMsg.msgLen = tSerializeSMDropStbReq(NULL, 0, &pReq);
314✔
1246
  if (pCmdMsg.msgLen <= 0) {
314✔
UNCOV
1247
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
1248
    goto end;
×
1249
  }
1250
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
314✔
1251
  RAW_NULL_CHECK(pCmdMsg.pMsg);
314✔
1252
  if (tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) <= 0) {
314✔
UNCOV
1253
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
1254
    taosMemoryFree(pCmdMsg.pMsg);
×
UNCOV
1255
    goto end;
×
1256
  }
1257

1258
  SQuery pQuery = {0};
314✔
1259
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
314✔
1260
  pQuery.pCmdMsg = &pCmdMsg;
314✔
1261
  pQuery.msgType = pQuery.pCmdMsg->msgType;
314✔
1262
  pQuery.stableQuery = true;
314✔
1263

1264
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
314✔
1265
  taosMemoryFree(pCmdMsg.pMsg);
314✔
1266
  if (pRequest->code == TSDB_CODE_SUCCESS) {
314✔
1267
    // ignore the error code
1268
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
314✔
1269
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
314✔
1270
  }
1271

1272
  code = pRequest->code;
314✔
1273

1274
end:
400✔
1275
  uDebug(LOG_ID_TAG " drop stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
400✔
1276
  destroyRequest(pRequest);
400✔
1277
  tDecoderClear(&coder);
400✔
1278
  return code;
400✔
1279
}
1280

1281
typedef struct SVgroupCreateTableBatch {
1282
  SVCreateTbBatchReq req;
1283
  SVgroupInfo        info;
1284
  char               dbName[TSDB_DB_NAME_LEN];
1285
} SVgroupCreateTableBatch;
1286

1287
static void destroyCreateTbReqBatch(void* data) {
5,997✔
1288
  if (data == NULL) {
5,997✔
UNCOV
1289
    uError("invalid parameter in %s", __func__);
×
UNCOV
1290
    return;
×
1291
  }
1292
  SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data;
5,997✔
1293
  taosArrayDestroy(pTbBatch->req.pArray);
5,997✔
1294
}
1295

1296
static int32_t taosCreateTable(TAOS* taos, void* meta, uint32_t metaLen) {
5,900✔
1297
  if (taos == NULL || meta == NULL) {
5,900✔
UNCOV
1298
    uError("invalid parameter in %s", __func__);
×
UNCOV
1299
    return TSDB_CODE_INVALID_PARA;
×
1300
  }
1301
  SVCreateTbBatchReq req = {0};
5,900✔
1302
  SDecoder           coder = {0};
5,900✔
1303
  int32_t            code = TSDB_CODE_SUCCESS;
5,900✔
1304
  SRequestObj*       pRequest = NULL;
5,900✔
1305
  SQuery*            pQuery = NULL;
5,900✔
1306
  SHashObj*          pVgroupHashmap = NULL;
5,900✔
1307
  SArray*            pTagList = taosArrayInit(0, POINTER_BYTES);
5,900✔
1308
  RAW_NULL_CHECK(pTagList);
5,900✔
1309
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
5,900✔
1310
  uDebug(LOG_ID_TAG " create table, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
5,900✔
1311

1312
  pRequest->syncQuery = true;
5,900✔
1313
  if (!pRequest->pDb) {
5,900✔
UNCOV
1314
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
1315
    goto end;
×
1316
  }
1317
  // decode and process req
1318
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
5,900✔
1319
  uint32_t len = metaLen - sizeof(SMsgHead);
5,900✔
1320
  tDecoderInit(&coder, data, len);
5,900✔
1321
  if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
5,900✔
UNCOV
1322
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
1323
    goto end;
×
1324
  }
1325

1326
  STscObj* pTscObj = pRequest->pTscObj;
5,900✔
1327

1328
  SVCreateTbReq* pCreateReq = NULL;
5,900✔
1329
  SCatalog*      pCatalog = NULL;
5,900✔
1330
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
5,900✔
1331
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
5,900✔
1332
  RAW_NULL_CHECK(pVgroupHashmap);
5,900✔
1333
  taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
5,900✔
1334

1335
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
5,900✔
1336
                           .requestId = pRequest->requestId,
5,900✔
1337
                           .requestObjRefId = pRequest->self,
5,900✔
1338
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
5,900✔
1339

1340
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
5,900✔
1341
  RAW_NULL_CHECK(pRequest->tableList);
5,900✔
1342
  // loop to create table
1343
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
12,630✔
1344
    pCreateReq = req.pReqs + iReq;
6,730✔
1345

1346
    SVgroupInfo pInfo = {0};
6,730✔
1347
    SName       pName = {0};
6,730✔
1348
    toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
6,730✔
1349
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
6,730✔
1350
    if (code != TSDB_CODE_SUCCESS) {
6,730✔
UNCOV
1351
      goto end;
×
1352
    }
1353

1354
    pCreateReq->flags |= TD_CREATE_IF_NOT_EXISTS;
6,730✔
1355
    // change tag cid to new cid
1356
    if (pCreateReq->type == TSDB_CHILD_TABLE) {
6,730✔
1357
      STableMeta* pTableMeta = NULL;
5,848✔
1358
      SName       sName = {0};
5,848✔
1359
      tb_uid_t    oldSuid = pCreateReq->ctb.suid;
5,848✔
1360
      //      pCreateReq->ctb.suid = processSuid(pCreateReq->ctb.suid, pRequest->pDb);
1361
      toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName);
5,848✔
1362
      code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta);
5,848✔
1363
      if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
5,848✔
UNCOV
1364
        code = TSDB_CODE_SUCCESS;
×
UNCOV
1365
        taosMemoryFreeClear(pTableMeta);
×
UNCOV
1366
        continue;
×
1367
      }
1368

1369
      if (code != TSDB_CODE_SUCCESS) {
5,848✔
UNCOV
1370
        goto end;
×
1371
      }
1372
      pCreateReq->ctb.suid = pTableMeta->uid;
5,848✔
1373

1374
      SArray* pTagVals = NULL;
5,848✔
1375
      code = tTagToValArray((STag*)pCreateReq->ctb.pTag, &pTagVals);
5,848✔
1376
      if (code != TSDB_CODE_SUCCESS) {
5,848✔
1377
        taosMemoryFreeClear(pTableMeta);
×
1378
        goto end;
×
1379
      }
1380

1381
      bool rebuildTag = false;
5,848✔
1382
      for (int32_t i = 0; i < taosArrayGetSize(pCreateReq->ctb.tagName); i++) {
18,116✔
1383
        char* tName = taosArrayGet(pCreateReq->ctb.tagName, i);
12,268✔
1384
        if (tName == NULL) {
12,268✔
UNCOV
1385
          continue;
×
1386
        }
1387
        for (int32_t j = pTableMeta->tableInfo.numOfColumns;
12,268✔
1388
             j < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; j++) {
48,210✔
1389
          SSchema* tag = &pTableMeta->schema[j];
35,942✔
1390
          if (strcmp(tag->name, tName) == 0 && tag->type != TSDB_DATA_TYPE_JSON) {
35,942✔
1391
            STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
11,556✔
1392
            if (pTagVal) {
11,556✔
1393
              if (pTagVal->cid != tag->colId) {
11,556✔
1394
                pTagVal->cid = tag->colId;
923✔
1395
                rebuildTag = true;
923✔
1396
              }
1397
            } else {
UNCOV
1398
              uError("create tb invalid data %s, size:%d index:%d cid:%d", pCreateReq->name,
×
1399
                     (int)taosArrayGetSize(pTagVals), i, tag->colId);
1400
            }
1401
          }
1402
        }
1403
      }
1404
      taosMemoryFreeClear(pTableMeta);
5,848✔
1405
      if (rebuildTag) {
5,848✔
1406
        STag* ppTag = NULL;
583✔
1407
        code = tTagNew(pTagVals, 1, false, &ppTag);
583✔
1408
        taosArrayDestroy(pTagVals);
583✔
1409
        pTagVals = NULL;
583✔
1410
        if (code != TSDB_CODE_SUCCESS) {
583✔
UNCOV
1411
          goto end;
×
1412
        }
1413
        if (NULL == taosArrayPush(pTagList, &ppTag)) {
583✔
UNCOV
1414
          tTagFree(ppTag);
×
UNCOV
1415
          goto end;
×
1416
        }
1417
        pCreateReq->ctb.pTag = (uint8_t*)ppTag;
583✔
1418
      }
1419
      taosArrayDestroy(pTagVals);
5,848✔
1420
    }
1421
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
13,460✔
1422

1423
    SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
6,730✔
1424
    if (pTableBatch == NULL) {
6,730✔
1425
      SVgroupCreateTableBatch tBatch = {0};
5,997✔
1426
      tBatch.info = pInfo;
5,997✔
1427
      tstrncpy(tBatch.dbName, pRequest->pDb, TSDB_DB_NAME_LEN);
5,997✔
1428

1429
      tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
5,997✔
1430
      RAW_NULL_CHECK(tBatch.req.pArray);
5,997✔
1431
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pCreateReq));
11,994✔
1432
      tBatch.req.source = TD_REQ_FROM_TAOX;
5,997✔
1433
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
5,997✔
1434
    } else {  // add to the correct vgroup
1435
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pCreateReq));
1,466✔
1436
    }
1437
  }
1438

1439
  if (taosHashGetSize(pVgroupHashmap) == 0) {
5,900✔
UNCOV
1440
    goto end;
×
1441
  }
1442
  SArray* pBufArray = NULL;
5,900✔
1443
  RAW_RETURN_CHECK(serializeVgroupsCreateTableBatch(pVgroupHashmap, &pBufArray));
5,900✔
1444
  pQuery = NULL;
5,900✔
1445
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
5,900✔
1446
  if (TSDB_CODE_SUCCESS != code) goto end;
5,900✔
1447
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
5,900✔
1448
  pQuery->msgType = TDMT_VND_CREATE_TABLE;
5,900✔
1449
  pQuery->stableQuery = false;
5,900✔
1450
  code = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT, &pQuery->pRoot);
5,900✔
1451
  if (TSDB_CODE_SUCCESS != code) goto end;
5,900✔
1452
  RAW_NULL_CHECK(pQuery->pRoot);
5,900✔
1453

1454
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
5,900✔
1455

1456
  launchQueryImpl(pRequest, pQuery, true, NULL);
5,900✔
1457
  if (pRequest->code == TSDB_CODE_SUCCESS) {
5,900✔
1458
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
5,900✔
1459
  }
1460

1461
  code = pRequest->code;
5,900✔
1462

1463
end:
5,900✔
1464
  uDebug(LOG_ID_TAG " create table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
5,900✔
1465
  tDeleteSVCreateTbBatchReq(&req);
5,900✔
1466

1467
  taosHashCleanup(pVgroupHashmap);
5,900✔
1468
  destroyRequest(pRequest);
5,900✔
1469
  tDecoderClear(&coder);
5,900✔
1470
  qDestroyQuery(pQuery);
5,900✔
1471
  taosArrayDestroyP(pTagList, NULL);
5,900✔
1472
  return code;
5,900✔
1473
}
1474

1475
typedef struct SVgroupDropTableBatch {
1476
  SVDropTbBatchReq req;
1477
  SVgroupInfo      info;
1478
  char             dbName[TSDB_DB_NAME_LEN];
1479
} SVgroupDropTableBatch;
1480

1481
static void destroyDropTbReqBatch(void* data) {
185✔
1482
  if (data == NULL) {
185✔
UNCOV
1483
    uError("invalid parameter in %s", __func__);
×
UNCOV
1484
    return;
×
1485
  }
1486
  SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
185✔
1487
  taosArrayDestroy(pTbBatch->req.pArray);
185✔
1488
}
1489

1490
static int32_t taosDropTable(TAOS* taos, void* meta, uint32_t metaLen) {
271✔
1491
  if (taos == NULL || meta == NULL) {
271✔
UNCOV
1492
    uError("invalid parameter in %s", __func__);
×
UNCOV
1493
    return TSDB_CODE_INVALID_PARA;
×
1494
  }
1495
  SVDropTbBatchReq req = {0};
271✔
1496
  SDecoder         coder = {0};
271✔
1497
  int32_t          code = TSDB_CODE_SUCCESS;
271✔
1498
  SRequestObj*     pRequest = NULL;
271✔
1499
  SQuery*          pQuery = NULL;
271✔
1500
  SHashObj*        pVgroupHashmap = NULL;
271✔
1501

1502
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
271✔
1503
  uDebug(LOG_ID_TAG " drop table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
271✔
1504

1505
  pRequest->syncQuery = true;
271✔
1506
  if (!pRequest->pDb) {
271✔
UNCOV
1507
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
1508
    goto end;
×
1509
  }
1510
  // decode and process req
1511
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
271✔
1512
  uint32_t len = metaLen - sizeof(SMsgHead);
271✔
1513
  tDecoderInit(&coder, data, len);
271✔
1514
  if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
271✔
UNCOV
1515
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
1516
    goto end;
×
1517
  }
1518

1519
  STscObj* pTscObj = pRequest->pTscObj;
271✔
1520

1521
  SVDropTbReq* pDropReq = NULL;
271✔
1522
  SCatalog*    pCatalog = NULL;
271✔
1523
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
271✔
1524

1525
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
271✔
1526
  RAW_NULL_CHECK(pVgroupHashmap);
271✔
1527
  taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
271✔
1528

1529
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
271✔
1530
                           .requestId = pRequest->requestId,
271✔
1531
                           .requestObjRefId = pRequest->self,
271✔
1532
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
271✔
1533
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
271✔
1534
  RAW_NULL_CHECK(pRequest->tableList);
271✔
1535
  // loop to create table
1536
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
613✔
1537
    pDropReq = req.pReqs + iReq;
342✔
1538
    pDropReq->igNotExists = true;
342✔
1539
    //    pDropReq->suid = processSuid(pDropReq->suid, pRequest->pDb);
1540

1541
    SVgroupInfo pInfo = {0};
342✔
1542
    SName       pName = {0};
342✔
1543
    toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
342✔
1544
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
342✔
1545

1546
    STableMeta* pTableMeta = NULL;
342✔
1547
    code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
342✔
1548
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
342✔
1549
      code = TSDB_CODE_SUCCESS;
86✔
1550
      taosMemoryFreeClear(pTableMeta);
86✔
1551
      continue;
86✔
1552
    }
1553
    if (code != TSDB_CODE_SUCCESS) {
256✔
UNCOV
1554
      goto end;
×
1555
    }
1556
    tb_uid_t oldSuid = pDropReq->suid;
256✔
1557
    pDropReq->suid = pTableMeta->suid;
256✔
1558
    taosMemoryFreeClear(pTableMeta);
256✔
1559
    uDebug(LOG_ID_TAG " drop table name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, pDropReq->name, oldSuid,
256✔
1560
           pDropReq->suid);
1561

1562
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
512✔
1563
    SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
256✔
1564
    if (pTableBatch == NULL) {
256✔
1565
      SVgroupDropTableBatch tBatch = {0};
185✔
1566
      tBatch.info = pInfo;
185✔
1567
      tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
185✔
1568
      RAW_NULL_CHECK(tBatch.req.pArray);
185✔
1569
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pDropReq));
370✔
1570
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
185✔
1571
    } else {  // add to the correct vgroup
1572
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pDropReq));
142✔
1573
    }
1574
  }
1575

1576
  if (taosHashGetSize(pVgroupHashmap) == 0) {
271✔
1577
    goto end;
86✔
1578
  }
1579
  SArray* pBufArray = NULL;
185✔
1580
  RAW_RETURN_CHECK(serializeVgroupsDropTableBatch(pVgroupHashmap, &pBufArray));
185✔
1581
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
185✔
1582
  if (TSDB_CODE_SUCCESS != code) goto end;
185✔
1583
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
185✔
1584
  pQuery->msgType = TDMT_VND_DROP_TABLE;
185✔
1585
  pQuery->stableQuery = false;
185✔
1586
  pQuery->pRoot = NULL;
185✔
1587
  code = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT, &pQuery->pRoot);
185✔
1588
  if (TSDB_CODE_SUCCESS != code) goto end;
185✔
1589
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
185✔
1590

1591
  launchQueryImpl(pRequest, pQuery, true, NULL);
185✔
1592
  if (pRequest->code == TSDB_CODE_SUCCESS) {
185✔
1593
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
185✔
1594
  }
1595
  code = pRequest->code;
185✔
1596

1597
end:
271✔
1598
  uDebug(LOG_ID_TAG " drop table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
271✔
1599
  taosHashCleanup(pVgroupHashmap);
271✔
1600
  destroyRequest(pRequest);
271✔
1601
  tDecoderClear(&coder);
271✔
1602
  qDestroyQuery(pQuery);
271✔
1603
  return code;
271✔
1604
}
1605

1606
static int32_t taosDeleteData(TAOS* taos, void* meta, uint32_t metaLen) {
167✔
1607
  if (taos == NULL || meta == NULL) {
167✔
UNCOV
1608
    uError("invalid parameter in %s", __func__);
×
UNCOV
1609
    return TSDB_CODE_INVALID_PARA;
×
1610
  }
1611
  SDeleteRes req = {0};
167✔
1612
  SDecoder   coder = {0};
167✔
1613
  char       sql[256] = {0};
167✔
1614
  int32_t    code = TSDB_CODE_SUCCESS;
167✔
1615

1616
  uDebug("connId:0x%" PRIx64 " delete data, meta:%p, len:%d", *(int64_t*)taos, meta, metaLen);
167✔
1617

1618
  // decode and process req
1619
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
167✔
1620
  uint32_t len = metaLen - sizeof(SMsgHead);
167✔
1621
  tDecoderInit(&coder, data, len);
167✔
1622
  if (tDecodeDeleteRes(&coder, &req) < 0) {
167✔
UNCOV
1623
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
1624
    goto end;
×
1625
  }
1626

1627
  (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
167✔
1628
                 req.tsColName, req.skey, req.tsColName, req.ekey);
1629

1630
  TAOS_RES* res = taosQueryImpl(taos, sql, false, TD_REQ_FROM_TAOX);
167✔
1631
  RAW_NULL_CHECK(res);
167✔
1632
  SRequestObj* pRequest = (SRequestObj*)res;
167✔
1633
  code = pRequest->code;
167✔
1634
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_PAR_GET_META_ERROR) {
167✔
1635
    code = TSDB_CODE_SUCCESS;
43✔
1636
  }
1637
  taos_free_result(res);
167✔
1638

1639
end:
167✔
1640
  uDebug("connId:0x%" PRIx64 " delete data sql:%s, code:%s", *(int64_t*)taos, sql, tstrerror(code));
167✔
1641
  tDecoderClear(&coder);
167✔
1642
  return code;
167✔
1643
}
1644

1645
static int32_t taosAlterTable(TAOS* taos, void* meta, uint32_t metaLen) {
1,428✔
1646
  if (taos == NULL || meta == NULL) {
1,428✔
UNCOV
1647
    uError("invalid parameter in %s", __func__);
×
UNCOV
1648
    return TSDB_CODE_INVALID_PARA;
×
1649
  }
1650
  SVAlterTbReq   req = {0};
1,428✔
1651
  SDecoder       dcoder = {0};
1,428✔
1652
  int32_t        code = TSDB_CODE_SUCCESS;
1,428✔
1653
  SRequestObj*   pRequest = NULL;
1,428✔
1654
  SQuery*        pQuery = NULL;
1,428✔
1655
  SArray*        pArray = NULL;
1,428✔
1656
  SVgDataBlocks* pVgData = NULL;
1,428✔
1657

1658
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
1,428✔
1659
  uDebug(LOG_ID_TAG " alter table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
1,428✔
1660
  pRequest->syncQuery = true;
1,428✔
1661
  if (!pRequest->pDb) {
1,428✔
UNCOV
1662
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
1663
    goto end;
×
1664
  }
1665
  // decode and process req
1666
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
1,428✔
1667
  uint32_t len = metaLen - sizeof(SMsgHead);
1,428✔
1668
  tDecoderInit(&dcoder, data, len);
1,428✔
1669
  if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) {
1,428✔
UNCOV
1670
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
1671
    goto end;
×
1672
  }
1673

1674
  // do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
1675
  if (req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS) {
1,428✔
1676
    goto end;
238✔
1677
  }
1678

1679
  STscObj*  pTscObj = pRequest->pTscObj;
1,190✔
1680
  SCatalog* pCatalog = NULL;
1,190✔
1681
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
1,190✔
1682
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
1,190✔
1683
                           .requestId = pRequest->requestId,
1,190✔
1684
                           .requestObjRefId = pRequest->self,
1,190✔
1685
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
1,190✔
1686

1687
  SVgroupInfo pInfo = {0};
1,190✔
1688
  SName       pName = {0};
1,190✔
1689
  toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
1,190✔
1690
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
1,190✔
1691
  pArray = taosArrayInit(1, sizeof(void*));
1,190✔
1692
  RAW_NULL_CHECK(pArray);
1,190✔
1693

1694
  pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
1,190✔
1695
  RAW_NULL_CHECK(pVgData);
1,190✔
1696
  pVgData->vg = pInfo;
1,190✔
1697

1698
  int tlen = 0;
1,190✔
1699
  req.source = TD_REQ_FROM_TAOX;
1,190✔
1700
  tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code);
1,190✔
1701
  if (code != 0) {
1,190✔
UNCOV
1702
    code = terrno;
×
UNCOV
1703
    goto end;
×
1704
  }
1705
  tlen += sizeof(SMsgHead);
1,190✔
1706
  void* pMsg = taosMemoryMalloc(tlen);
1,190✔
1707
  RAW_NULL_CHECK(pMsg);
1,190✔
1708
  ((SMsgHead*)pMsg)->vgId = htonl(pInfo.vgId);
1,190✔
1709
  ((SMsgHead*)pMsg)->contLen = htonl(tlen);
1,190✔
1710
  void*    pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
1,190✔
1711
  SEncoder coder = {0};
1,190✔
1712
  tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
1,190✔
1713
  code = tEncodeSVAlterTbReq(&coder, &req);
1,190✔
1714
  if (code != 0) {
1,190✔
1715
    tEncoderClear(&coder);
×
UNCOV
1716
    code = terrno;
×
UNCOV
1717
    goto end;
×
1718
  }
1719
  tEncoderClear(&coder);
1,190✔
1720

1721
  pVgData->pData = pMsg;
1,190✔
1722
  pVgData->size = tlen;
1,190✔
1723

1724
  pVgData->numOfTables = 1;
1,190✔
1725
  RAW_NULL_CHECK(taosArrayPush(pArray, &pVgData));
1,190✔
1726

1727
  pQuery = NULL;
1,190✔
1728
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
1,190✔
1729
  if (NULL == pQuery) goto end;
1,190✔
1730
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
1,190✔
1731
  pQuery->msgType = TDMT_VND_ALTER_TABLE;
1,190✔
1732
  pQuery->stableQuery = false;
1,190✔
1733
  pQuery->pRoot = NULL;
1,190✔
1734
  code = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT, &pQuery->pRoot);
1,190✔
1735
  if (TSDB_CODE_SUCCESS != code) goto end;
1,190✔
1736
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pArray));
1,190✔
1737

1738
  launchQueryImpl(pRequest, pQuery, true, NULL);
1,190✔
1739

1740
  pVgData = NULL;
1,190✔
1741
  pArray = NULL;
1,190✔
1742
  code = pRequest->code;
1,190✔
1743
  if (code == TSDB_CODE_TDB_TABLE_NOT_EXIST) {
1,190✔
1744
    code = TSDB_CODE_SUCCESS;
43✔
1745
  }
1746

1747
  if (pRequest->code == TSDB_CODE_SUCCESS) {
1,190✔
1748
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
1,147✔
1749
    if (pRes->res != NULL) {
1,147✔
1750
      code = handleAlterTbExecRes(pRes->res, pCatalog);
952✔
1751
    }
1752
  }
1753
end:
1,428✔
1754
  uDebug(LOG_ID_TAG " alter table return, meta:%p, len:%d, msg:%s", LOG_ID_VALUE, meta, metaLen, tstrerror(code));
1,428✔
1755
  taosArrayDestroy(pArray);
1,428✔
1756
  if (pVgData) taosMemoryFreeClear(pVgData->pData);
1,428✔
1757
  taosMemoryFreeClear(pVgData);
1,428✔
1758
  destroyRequest(pRequest);
1,428✔
1759
  tDecoderClear(&dcoder);
1,428✔
1760
  qDestroyQuery(pQuery);
1,428✔
1761
  taosArrayDestroy(req.pMultiTag);
1,428✔
1762
  return code;
1,428✔
1763
}
1764

1765
int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD* fields,
46✔
1766
                                     int numFields) {
1767
  return taos_write_raw_block_with_fields_with_reqid(taos, rows, pData, tbname, fields, numFields, 0);
46✔
1768
}
1769

1770
int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname,
46✔
1771
                                                TAOS_FIELD* fields, int numFields, int64_t reqid) {
1772
  if (taos == NULL || pData == NULL || tbname == NULL) {
46✔
UNCOV
1773
    uError("invalid parameter in %s", __func__);
×
UNCOV
1774
    return TSDB_CODE_INVALID_PARA;
×
1775
  }
1776
  int32_t     code = TSDB_CODE_SUCCESS;
46✔
1777
  STableMeta* pTableMeta = NULL;
46✔
1778
  SQuery*     pQuery = NULL;
46✔
1779
  SHashObj*   pVgHash = NULL;
46✔
1780

1781
  SRequestObj* pRequest = NULL;
46✔
1782
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
46✔
1783

1784
  uDebug(LOG_ID_TAG " write raw block with field, rows:%d, pData:%p, tbname:%s, fields:%p, numFields:%d", LOG_ID_VALUE,
46✔
1785
         rows, pData, tbname, fields, numFields);
1786

1787
  pRequest->syncQuery = true;
46✔
1788
  if (!pRequest->pDb) {
46✔
UNCOV
1789
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
1790
    goto end;
×
1791
  }
1792

1793
  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
46✔
1794
  tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
46✔
1795
  tstrncpy(pName.tname, tbname, sizeof(pName.tname));
46✔
1796

1797
  struct SCatalog* pCatalog = NULL;
46✔
1798
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
46✔
1799

1800
  SRequestConnInfo conn = {0};
46✔
1801
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
46✔
1802
  conn.requestId = pRequest->requestId;
46✔
1803
  conn.requestObjRefId = pRequest->self;
46✔
1804
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
46✔
1805

1806
  SVgroupInfo vgData = {0};
46✔
1807
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData));
46✔
1808
  RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
46✔
1809
  RAW_RETURN_CHECK(smlInitHandle(&pQuery));
46✔
1810
  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
46✔
1811
  RAW_NULL_CHECK(pVgHash);
46✔
1812
  RAW_RETURN_CHECK(
46✔
1813
      taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
1814
  RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0, false));
46✔
1815
  RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
46✔
1816

1817
  launchQueryImpl(pRequest, pQuery, true, NULL);
46✔
1818
  code = pRequest->code;
46✔
1819

1820
end:
46✔
1821
  uDebug(LOG_ID_TAG " write raw block with field return, msg:%s", LOG_ID_VALUE, tstrerror(code));
46✔
1822
  taosMemoryFreeClear(pTableMeta);
46✔
1823
  qDestroyQuery(pQuery);
46✔
1824
  destroyRequest(pRequest);
46✔
1825
  taosHashCleanup(pVgHash);
46✔
1826
  return code;
46✔
1827
}
1828

1829
int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) {
322✔
1830
  return taos_write_raw_block_with_reqid(taos, rows, pData, tbname, 0);
322✔
1831
}
1832

1833
int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname, int64_t reqid) {
322✔
1834
  if (taos == NULL || pData == NULL || tbname == NULL) {
322✔
UNCOV
1835
    return TSDB_CODE_INVALID_PARA;
×
1836
  }
1837
  int32_t     code = TSDB_CODE_SUCCESS;
322✔
1838
  STableMeta* pTableMeta = NULL;
322✔
1839
  SQuery*     pQuery = NULL;
322✔
1840
  SHashObj*   pVgHash = NULL;
322✔
1841

1842
  SRequestObj* pRequest = NULL;
322✔
1843
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
322✔
1844

1845
  uDebug(LOG_ID_TAG " write raw block, rows:%d, pData:%p, tbname:%s", LOG_ID_VALUE, rows, pData, tbname);
322✔
1846

1847
  pRequest->syncQuery = true;
322✔
1848
  if (!pRequest->pDb) {
322✔
UNCOV
1849
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
1850
    goto end;
×
1851
  }
1852

1853
  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
322✔
1854
  tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
322✔
1855
  tstrncpy(pName.tname, tbname, sizeof(pName.tname));
322✔
1856

1857
  struct SCatalog* pCatalog = NULL;
322✔
1858
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
322✔
1859

1860
  SRequestConnInfo conn = {0};
322✔
1861
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
322✔
1862
  conn.requestId = pRequest->requestId;
322✔
1863
  conn.requestObjRefId = pRequest->self;
322✔
1864
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
322✔
1865

1866
  SVgroupInfo vgData = {0};
322✔
1867
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData));
322✔
1868
  RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
322✔
1869
  RAW_RETURN_CHECK(smlInitHandle(&pQuery));
276✔
1870
  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
276✔
1871
  RAW_NULL_CHECK(pVgHash);
276✔
1872
  RAW_RETURN_CHECK(
276✔
1873
      taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
1874
  RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false, NULL, 0, false));
276✔
1875
  RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
184✔
1876

1877
  launchQueryImpl(pRequest, pQuery, true, NULL);
184✔
1878
  code = pRequest->code;
184✔
1879

1880
end:
322✔
1881
  uDebug(LOG_ID_TAG " write raw block return, msg:%s", LOG_ID_VALUE, tstrerror(code));
322✔
1882
  taosMemoryFreeClear(pTableMeta);
322✔
1883
  qDestroyQuery(pQuery);
322✔
1884
  destroyRequest(pRequest);
322✔
1885
  taosHashCleanup(pVgHash);
322✔
1886
  return code;
322✔
1887
}
1888

1889
static void* getRawDataFromRes(void* pRetrieve) {
5,913✔
1890
  if (pRetrieve == NULL) {
5,913✔
UNCOV
1891
    uError("invalid parameter in %s", __func__);
×
UNCOV
1892
    return NULL;
×
1893
  }
1894
  void* rawData = NULL;
5,913✔
1895
  // deal with compatibility
1896
  if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_VERSION) {
5,913✔
UNCOV
1897
    rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
×
1898
  } else if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_VERSION ||
5,913✔
UNCOV
1899
             *(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION) {
×
1900
    rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
5,913✔
1901
  }
1902
  return rawData;
5,913✔
1903
}
1904

1905
static int32_t buildCreateTbMap(SMqDataRsp* rsp, SHashObj* pHashObj) {
264✔
1906
  if (rsp == NULL || pHashObj == NULL) {
264✔
UNCOV
1907
    uError("invalid parameter in %s", __func__);
×
UNCOV
1908
    return TSDB_CODE_INVALID_PARA;
×
1909
  }
1910
  // find schema data info
1911
  int32_t       code = 0;
264✔
1912
  SVCreateTbReq pCreateReq = {0};
264✔
1913
  SDecoder      decoderTmp = {0};
264✔
1914

1915
  for (int j = 0; j < rsp->createTableNum; j++) {
932✔
1916
    void** dataTmp = taosArrayGet(rsp->createTableReq, j);
668✔
1917
    RAW_NULL_CHECK(dataTmp);
668✔
1918
    int32_t* lenTmp = taosArrayGet(rsp->createTableLen, j);
668✔
1919
    RAW_NULL_CHECK(lenTmp);
668✔
1920

1921
    tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
668✔
1922
    RAW_RETURN_CHECK(tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq));
668✔
1923

1924
    if (pCreateReq.type != TSDB_CHILD_TABLE) {
668✔
UNCOV
1925
      code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1926
      goto end;
×
1927
    }
1928
    if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL) {
668✔
1929
      RAW_RETURN_CHECK(
668✔
1930
          taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq)));
1931
    } else {
UNCOV
1932
      tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
×
UNCOV
1933
      pCreateReq = (SVCreateTbReq){0};
×
1934
    }
1935

1936
    tDecoderClear(&decoderTmp);
668✔
1937
  }
1938
  return 0;
264✔
1939

UNCOV
1940
end:
×
UNCOV
1941
  tDecoderClear(&decoderTmp);
×
UNCOV
1942
  tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
×
UNCOV
1943
  return code;
×
1944
}
1945

1946
typedef enum {
1947
  WRITE_RAW_INIT_START = 0,
1948
  WRITE_RAW_INIT_OK,
1949
  WRITE_RAW_INIT_FAIL,
1950
} WRITE_RAW_INIT_STATUS;
1951

1952
static SHashObj* writeRawCache = NULL;
1953
static int8_t    initFlag = 0;
1954
static int8_t    initedFlag = WRITE_RAW_INIT_START;
1955

1956
typedef struct {
1957
  SHashObj* pVgHash;
1958
  SHashObj* pNameHash;
1959
  SHashObj* pMetaHash;
1960
} rawCacheInfo;
1961

1962
typedef struct {
1963
  SVgroupInfo vgInfo;
1964
  int64_t     uid;
1965
  int64_t     suid;
1966
} tbInfo;
1967

1968
static void tmqFreeMeta(void* data) {
1,765✔
1969
  if (data == NULL) {
1,765✔
UNCOV
1970
    uError("invalid parameter in %s", __func__);
×
UNCOV
1971
    return;
×
1972
  }
1973
  STableMeta* pTableMeta = *(STableMeta**)data;
1,765✔
1974
  taosMemoryFree(pTableMeta);
1,765✔
1975
}
1976

UNCOV
1977
static void freeRawCache(void* data) {
×
UNCOV
1978
  if (data == NULL) {
×
UNCOV
1979
    uError("invalid parameter in %s", __func__);
×
UNCOV
1980
    return;
×
1981
  }
1982
  rawCacheInfo* pRawCache = (rawCacheInfo*)data;
×
1983
  taosHashCleanup(pRawCache->pMetaHash);
×
UNCOV
1984
  taosHashCleanup(pRawCache->pNameHash);
×
UNCOV
1985
  taosHashCleanup(pRawCache->pVgHash);
×
1986
}
1987

1988
static int32_t initRawCacheHash() {
618✔
1989
  if (writeRawCache == NULL) {
618✔
1990
    writeRawCache = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
618✔
1991
    if (writeRawCache == NULL) {
618✔
1992
      return terrno;
×
1993
    }
1994
    taosHashSetFreeFp(writeRawCache, freeRawCache);
618✔
1995
  }
1996
  return 0;
618✔
1997
}
1998

1999
static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW) {
733✔
2000
  if (rawData == NULL || pSW == NULL) {
733✔
UNCOV
2001
    return false;
×
2002
  }
2003
  if (pTableMeta == NULL) {
733✔
2004
    uError("invalid parameter in %s", __func__);
×
UNCOV
2005
    return false;
×
2006
  }
2007
  char* p = (char*)rawData;
733✔
2008
  // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
2009
  // column length |
2010
  p += sizeof(int32_t);
733✔
2011
  p += sizeof(int32_t);
733✔
2012
  p += sizeof(int32_t);
733✔
2013
  p += sizeof(int32_t);
733✔
2014
  p += sizeof(int32_t);
733✔
2015
  p += sizeof(uint64_t);
733✔
2016
  int8_t* fields = p;
733✔
2017

2018
  if (pSW->nCols != pTableMeta->tableInfo.numOfColumns) {
733✔
2019
    return true;
221✔
2020
  }
2021

2022
  for (int i = 0; i < pSW->nCols; i++) {
2,684✔
2023
    int j = 0;
2,172✔
2024
    for (; j < pTableMeta->tableInfo.numOfColumns; j++) {
5,740✔
2025
      SSchema*    pColSchema = &pTableMeta->schema[j];
5,740✔
2026
      SSchemaExt* pColExtSchema = &pTableMeta->schemaExt[j];
5,740✔
2027
      char*       fieldName = pSW->pSchema[i].name;
5,740✔
2028

2029
      if (strcmp(pColSchema->name, fieldName) == 0) {
5,740✔
2030
        if (checkSchema(pColSchema, pColExtSchema, fields, NULL, 0) != 0) {
2,172✔
UNCOV
2031
          return true;
×
2032
        }
2033
        break;
2,172✔
2034
      }
2035
    }
2036
    fields += sizeof(int8_t) + sizeof(int32_t);
2,172✔
2037

2038
    if (j == pTableMeta->tableInfo.numOfColumns) return true;
2,172✔
2039
  }
2040
  return false;
512✔
2041
}
2042

2043
static int32_t getRawCache(SHashObj** pVgHash, SHashObj** pNameHash, SHashObj** pMetaHash, void* key) {
2,207✔
2044
  if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || key == NULL) {
2,207✔
UNCOV
2045
    uError("invalid parameter in %s", __func__);
×
UNCOV
2046
    return TSDB_CODE_INVALID_PARA;
×
2047
  }
2048
  int32_t code = 0;
2,207✔
2049
  void*   cacheInfo = taosHashGet(writeRawCache, &key, POINTER_BYTES);
2,207✔
2050
  if (cacheInfo == NULL) {
2,207✔
2051
    *pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
2,207✔
2052
    RAW_NULL_CHECK(*pVgHash);
2,207✔
2053
    *pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
2,207✔
2054
    RAW_NULL_CHECK(*pNameHash);
2,207✔
2055
    *pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
2,207✔
2056
    RAW_NULL_CHECK(*pMetaHash);
2,207✔
2057
    taosHashSetFreeFp(*pMetaHash, tmqFreeMeta);
2,207✔
2058
    rawCacheInfo info = {*pVgHash, *pNameHash, *pMetaHash};
2,207✔
2059
    RAW_RETURN_CHECK(taosHashPut(writeRawCache, &key, POINTER_BYTES, &info, sizeof(rawCacheInfo)));
2,207✔
2060
  } else {
UNCOV
2061
    rawCacheInfo* info = (rawCacheInfo*)cacheInfo;
×
UNCOV
2062
    *pVgHash = info->pVgHash;
×
UNCOV
2063
    *pNameHash = info->pNameHash;
×
UNCOV
2064
    *pMetaHash = info->pMetaHash;
×
2065
  }
2066

2067
  return 0;
2,207✔
UNCOV
2068
end:
×
UNCOV
2069
  taosHashCleanup(*pMetaHash);
×
UNCOV
2070
  taosHashCleanup(*pNameHash);
×
UNCOV
2071
  taosHashCleanup(*pVgHash);
×
UNCOV
2072
  return code;
×
2073
}
2074

2075
static int32_t buildRawRequest(TAOS* taos, SRequestObj** pRequest, SCatalog** pCatalog, SRequestConnInfo* conn) {
2,207✔
2076
  if (taos == NULL || pRequest == NULL || pCatalog == NULL || conn == NULL) {
2,207✔
UNCOV
2077
    uError("invalid parameter in %s", __func__);
×
UNCOV
2078
    return TSDB_CODE_INVALID_PARA;
×
2079
  }
2080
  int32_t code = 0;
2,207✔
2081
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, pRequest, 0));
2,207✔
2082
  (*pRequest)->syncQuery = true;
2,207✔
2083
  if (!(*pRequest)->pDb) {
2,207✔
2084
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
2085
    goto end;
×
2086
  }
2087

2088
  RAW_RETURN_CHECK(catalogGetHandle((*pRequest)->pTscObj->pAppInfo->clusterId, pCatalog));
2,207✔
2089
  conn->pTrans = (*pRequest)->pTscObj->pAppInfo->pTransporter;
2,207✔
2090
  conn->requestId = (*pRequest)->requestId;
2,207✔
2091
  conn->requestObjRefId = (*pRequest)->self;
2,207✔
2092
  conn->mgmtEps = getEpSet_s(&(*pRequest)->pTscObj->pAppInfo->mgmtEp);
2,207✔
2093

2094
end:
2,207✔
2095
  return code;
2,207✔
2096
}
2097

2098
typedef int32_t _raw_decode_func_(SDecoder* pDecoder, SMqDataRsp* pRsp);
2099
static int32_t  decodeRawData(SDecoder* decoder, void* data, uint32_t dataLen, _raw_decode_func_ func,
2,207✔
2100
                              SMqRspObj* rspObj) {
2101
  if (decoder == NULL || data == NULL || func == NULL || rspObj == NULL) {
2,207✔
UNCOV
2102
    uError("invalid parameter in %s", __func__);
×
UNCOV
2103
    return TSDB_CODE_INVALID_PARA;
×
2104
  }
2105
  int8_t dataVersion = *(int8_t*)data;
2,207✔
2106
  if (dataVersion >= MQ_DATA_RSP_VERSION) {
2,207✔
2107
    data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
2,207✔
2108
    if (dataLen < sizeof(int8_t) + sizeof(int32_t)) {
2,207✔
UNCOV
2109
      return TSDB_CODE_INVALID_PARA;
×
2110
    }
2111
    dataLen -= sizeof(int8_t) + sizeof(int32_t);
2,207✔
2112
  }
2113

2114
  rspObj->resIter = -1;
2,207✔
2115
  tDecoderInit(decoder, data, dataLen);
2,207✔
2116
  int32_t code = func(decoder, &rspObj->dataRsp);
2,207✔
2117
  if (code != 0) {
2,207✔
UNCOV
2118
    SET_ERROR_MSG("decode mq taosx data rsp failed");
×
2119
  }
2120
  return code;
2,207✔
2121
}
2122

2123
static int32_t processCacheMeta(SHashObj* pVgHash, SHashObj* pNameHash, SHashObj* pMetaHash,
5,913✔
2124
                                SVCreateTbReq* pCreateReqDst, SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName,
2125
                                STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry) {
2126
  if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || pCatalog == NULL || conn == NULL || pName == NULL ||
5,913✔
2127
      pMeta == NULL) {
UNCOV
2128
    uError("invalid parameter in %s", __func__);
×
UNCOV
2129
    return TSDB_CODE_INVALID_PARA;
×
2130
  }
2131
  int32_t     code = 0;
5,913✔
2132
  STableMeta* pTableMeta = NULL;
5,913✔
2133
  tbInfo*     tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
5,913✔
2134
  if (tmpInfo == NULL || retry > 0) {
5,913✔
2135
    tbInfo info = {0};
5,180✔
2136

2137
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, conn, pName, &info.vgInfo));
5,180✔
2138
    if (pCreateReqDst && tmpInfo == NULL) {  // change stable name to get meta
5,180✔
2139
      tstrncpy(pName->tname, pCreateReqDst->ctb.stbName, TSDB_TABLE_NAME_LEN);
668✔
2140
    }
2141
    RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
5,180✔
2142
    info.uid = pTableMeta->uid;
5,180✔
2143
    if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
5,180✔
2144
      info.suid = pTableMeta->suid;
3,815✔
2145
    } else {
2146
      info.suid = pTableMeta->uid;
1,365✔
2147
    }
2148
    code = taosHashPut(pMetaHash, &info.suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
5,180✔
2149
    if (code != 0) {
5,180✔
UNCOV
2150
      taosMemoryFree(pTableMeta);
×
UNCOV
2151
      goto end;
×
2152
    }
2153
    uDebug("put table meta to hash1, suid:%" PRId64 ", metaHashSIze:%d, nameHashSize:%d, vgHashSize:%d", info.suid, taosHashGetSize(pMetaHash),
5,180✔
2154
           taosHashGetSize(pNameHash), taosHashGetSize(pVgHash));
2155
    if (pCreateReqDst) {
5,180✔
2156
      pTableMeta->vgId = info.vgInfo.vgId;
668✔
2157
      pTableMeta->uid = pCreateReqDst->uid;
668✔
2158
      pCreateReqDst->ctb.suid = pTableMeta->suid;
668✔
2159
    }
2160

2161
    RAW_RETURN_CHECK(taosHashPut(pNameHash, pName->tname, strlen(pName->tname), &info, sizeof(tbInfo)));
5,180✔
2162
    tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
5,180✔
2163
    RAW_RETURN_CHECK(
5,180✔
2164
        taosHashPut(pVgHash, &info.vgInfo.vgId, sizeof(info.vgInfo.vgId), &info.vgInfo, sizeof(SVgroupInfo)));
2165
  }
2166

2167
  if (pTableMeta == NULL || retry > 0) {
5,913✔
2168
    STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, &tmpInfo->suid, LONG_BYTES);
733✔
2169
    if (pTableMetaTmp == NULL || retry > 0 || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) {
733✔
2170
      RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
221✔
2171
      code = taosHashPut(pMetaHash, &tmpInfo->suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
221✔
2172
      if (code != 0) {
221✔
UNCOV
2173
        taosMemoryFree(pTableMeta);
×
UNCOV
2174
        goto end;
×
2175
      }
2176
      uDebug("put table meta to hash2, suid:%" PRId64 ", metaHashSIze:%d, nameHashSize:%d, vgHashSize:%d", tmpInfo->suid, taosHashGetSize(pMetaHash),
221✔
2177
      taosHashGetSize(pNameHash), taosHashGetSize(pVgHash));
2178
    } else {
2179
      pTableMeta = *pTableMetaTmp;
512✔
2180
      pTableMeta->uid = tmpInfo->uid;
512✔
2181
      pTableMeta->vgId = tmpInfo->vgInfo.vgId;
512✔
2182
    }
2183
  }
2184
  *pMeta = pTableMeta;
5,913✔
2185

2186
end:
5,913✔
2187
  return code;
5,913✔
2188
}
2189

2190
static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
1,943✔
2191
  if (taos == NULL || data == NULL) {
1,943✔
UNCOV
2192
    uError("invalid parameter in %s", __func__);
×
UNCOV
2193
    return TSDB_CODE_INVALID_PARA;
×
2194
  }
2195
  int32_t   code = TSDB_CODE_SUCCESS;
1,943✔
2196
  SQuery*   pQuery = NULL;
1,943✔
2197
  SMqRspObj rspObj = {0};
1,943✔
2198
  SDecoder  decoder = {0};
1,943✔
2199

2200
  SRequestObj*     pRequest = NULL;
1,943✔
2201
  SCatalog*        pCatalog = NULL;
1,943✔
2202
  SRequestConnInfo conn = {0};
1,943✔
2203
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
1,943✔
2204
  uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
1,943✔
2205
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
1,943✔
2206

2207
  SHashObj* pVgHash = NULL;
1,943✔
2208
  SHashObj* pNameHash = NULL;
1,943✔
2209
  SHashObj* pMetaHash = NULL;
1,943✔
2210
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
1,943✔
2211
  int retry = 0;
1,943✔
2212
  while (1) {
2213
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
1,943✔
2214
    uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
1,943✔
2215
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
6,687✔
2216
      if (!rspObj.dataRsp.withSchema) {
4,744✔
UNCOV
2217
        goto end;
×
2218
      }
2219

2220
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
4,744✔
2221
      RAW_NULL_CHECK(tbName);
4,744✔
2222
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
4,744✔
2223
      RAW_NULL_CHECK(pSW);
4,744✔
2224
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
4,744✔
2225
      RAW_NULL_CHECK(pRetrieve);
4,744✔
2226
      void* rawData = getRawDataFromRes(pRetrieve);
4,744✔
2227
      RAW_NULL_CHECK(rawData);
4,744✔
2228

2229
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
4,744✔
2230
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
4,744✔
2231
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
4,744✔
2232
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
4,744✔
2233

2234
      STableMeta* pTableMeta = NULL;
4,744✔
2235
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, &pTableMeta, pSW,
4,744✔
2236
                                        rawData, retry));
2237
      char err[ERR_MSG_LEN] = {0};
4,744✔
2238
      code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
4,744✔
2239
      if (code != TSDB_CODE_SUCCESS) {
4,744✔
UNCOV
2240
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
UNCOV
2241
        goto end;
×
2242
      }
2243
    }
2244
    RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
1,943✔
2245
    launchQueryImpl(pRequest, pQuery, true, NULL);
1,943✔
2246
    code = pRequest->code;
1,943✔
2247

2248
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
1,943✔
UNCOV
2249
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
UNCOV
2250
      qDestroyQuery(pQuery);
×
UNCOV
2251
      pQuery = NULL;
×
2252
      rspObj.resIter = -1;
×
2253
      continue;
×
2254
    }
2255
    break;
1,943✔
2256
  }
2257

2258
end:
1,943✔
2259
  uDebug(LOG_ID_TAG " write raw data return, msg:%s", LOG_ID_VALUE, tstrerror(code));
1,943✔
2260
  tDeleteMqDataRsp(&rspObj.dataRsp);
1,943✔
2261
  tDecoderClear(&decoder);
1,943✔
2262
  qDestroyQuery(pQuery);
1,943✔
2263
  destroyRequest(pRequest);
1,943✔
2264
  return code;
1,943✔
2265
}
2266

2267
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
264✔
2268
  if (taos == NULL || data == NULL) {
264✔
UNCOV
2269
    uError("invalid parameter in %s", __func__);
×
UNCOV
2270
    return TSDB_CODE_INVALID_PARA;
×
2271
  }
2272
  int32_t   code = TSDB_CODE_SUCCESS;
264✔
2273
  SQuery*   pQuery = NULL;
264✔
2274
  SMqRspObj rspObj = {0};
264✔
2275
  SDecoder  decoder = {0};
264✔
2276
  SHashObj* pCreateTbHash = NULL;
264✔
2277

2278
  SRequestObj*     pRequest = NULL;
264✔
2279
  SCatalog*        pCatalog = NULL;
264✔
2280
  SRequestConnInfo conn = {0};
264✔
2281

2282
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
264✔
2283
  uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
264✔
2284
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeSTaosxRsp, &rspObj));
264✔
2285

2286
  pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
264✔
2287
  RAW_NULL_CHECK(pCreateTbHash);
264✔
2288
  RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash));
264✔
2289

2290
  SHashObj* pVgHash = NULL;
264✔
2291
  SHashObj* pNameHash = NULL;
264✔
2292
  SHashObj* pMetaHash = NULL;
264✔
2293
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
264✔
2294
  int retry = 0;
264✔
2295
  while (1) {
2296
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
264✔
2297
    uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
264✔
2298
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
1,433✔
2299
      if (!rspObj.dataRsp.withSchema) {
1,169✔
UNCOV
2300
        goto end;
×
2301
      }
2302

2303
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
1,169✔
2304
      RAW_NULL_CHECK(tbName);
1,169✔
2305
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
1,169✔
2306
      RAW_NULL_CHECK(pSW);
1,169✔
2307
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
1,169✔
2308
      RAW_NULL_CHECK(pRetrieve);
1,169✔
2309
      void* rawData = getRawDataFromRes(pRetrieve);
1,169✔
2310
      RAW_NULL_CHECK(rawData);
1,169✔
2311

2312
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
1,169✔
2313
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
1,169✔
2314
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
1,169✔
2315
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
1,169✔
2316

2317
      // find schema data info
2318
      SVCreateTbReq* pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, pName.tname, strlen(pName.tname));
1,169✔
2319
      STableMeta*    pTableMeta = NULL;
1,169✔
2320
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, pCreateReqDst, pCatalog, &conn, &pName,
1,169✔
2321
                                        &pTableMeta, pSW, rawData, retry));
2322
      char err[ERR_MSG_LEN] = {0};
1,169✔
2323
      code =
2324
          rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
1,169✔
2325
      if (code != TSDB_CODE_SUCCESS) {
1,169✔
UNCOV
2326
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
UNCOV
2327
        goto end;
×
2328
      }
2329
    }
2330
    RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
264✔
2331
    launchQueryImpl(pRequest, pQuery, true, NULL);
264✔
2332
    code = pRequest->code;
264✔
2333

2334
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
264✔
UNCOV
2335
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
UNCOV
2336
      qDestroyQuery(pQuery);
×
UNCOV
2337
      pQuery = NULL;
×
2338
      rspObj.resIter = -1;
×
2339
      continue;
×
2340
    }
2341
    break;
264✔
2342
  }
2343

2344
end:
264✔
2345
  uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
264✔
2346
  tDeleteSTaosxRsp(&rspObj.dataRsp);
264✔
2347
  void* pIter = taosHashIterate(pCreateTbHash, NULL);
264✔
2348
  while (pIter) {
932✔
2349
    tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE);
668✔
2350
    pIter = taosHashIterate(pCreateTbHash, pIter);
668✔
2351
  }
2352
  taosHashCleanup(pCreateTbHash);
264✔
2353
  tDecoderClear(&decoder);
264✔
2354
  qDestroyQuery(pQuery);
264✔
2355
  destroyRequest(pRequest);
264✔
2356
  return code;
264✔
2357
}
2358

UNCOV
2359
static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
×
UNCOV
2360
  if (taos == NULL || data == NULL) {
×
UNCOV
2361
    uError("invalid parameter in %s", __func__);
×
UNCOV
2362
    return TSDB_CODE_INVALID_PARA;
×
2363
  }
UNCOV
2364
  int32_t   code = TSDB_CODE_SUCCESS;
×
UNCOV
2365
  SQuery*   pQuery = NULL;
×
UNCOV
2366
  SHashObj* pVgroupHash = NULL;
×
UNCOV
2367
  SMqRspObj rspObj = {0};
×
UNCOV
2368
  SDecoder  decoder = {0};
×
2369

UNCOV
2370
  SRequestObj*     pRequest = NULL;
×
2371
  SCatalog*        pCatalog = NULL;
×
2372
  SRequestConnInfo conn = {0};
×
2373

2374
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
×
UNCOV
2375
  uDebug(LOG_ID_TAG " write raw rawdata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
×
2376
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
×
2377

2378
  SHashObj* pVgHash = NULL;
×
2379
  SHashObj* pNameHash = NULL;
×
2380
  SHashObj* pMetaHash = NULL;
×
UNCOV
2381
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
×
2382
  int retry = 0;
×
2383
  while (1) {
×
2384
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
×
UNCOV
2385
    uDebug(LOG_ID_TAG " write raw rawdata block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
×
2386
    SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(pQuery)->pRoot;
×
2387
    pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
×
2388
    RAW_NULL_CHECK(pVgroupHash);
×
UNCOV
2389
    pStmt->pVgDataBlocks = taosArrayInit(8, POINTER_BYTES);
×
2390
    RAW_NULL_CHECK(pStmt->pVgDataBlocks);
×
2391

2392
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
×
2393
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
×
2394
      RAW_NULL_CHECK(tbName);
×
2395
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
×
2396
      RAW_NULL_CHECK(pRetrieve);
×
2397
      void* rawData = getRawDataFromRes(pRetrieve);
×
2398
      RAW_NULL_CHECK(rawData);
×
2399

2400
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
×
2401
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
×
2402
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
×
UNCOV
2403
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
×
2404

2405
      // find schema data info
2406
      STableMeta* pTableMeta = NULL;
×
2407
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, &pTableMeta, NULL,
×
2408
                                        NULL, retry));
2409
      char err[ERR_MSG_LEN] = {0};
×
2410
      code = rawBlockBindRawData(pVgroupHash, pStmt->pVgDataBlocks, pTableMeta, rawData);
×
UNCOV
2411
      if (code != TSDB_CODE_SUCCESS) {
×
2412
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
2413
        goto end;
×
2414
      }
2415
    }
UNCOV
2416
    taosHashCleanup(pVgroupHash);
×
UNCOV
2417
    pVgroupHash = NULL;
×
2418

2419
    RAW_RETURN_CHECK(smlBuildOutputRaw(pQuery, pVgHash));
×
UNCOV
2420
    launchQueryImpl(pRequest, pQuery, true, NULL);
×
2421
    code = pRequest->code;
×
2422

2423
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
×
2424
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
2425
      qDestroyQuery(pQuery);
×
UNCOV
2426
      pQuery = NULL;
×
UNCOV
2427
      rspObj.resIter = -1;
×
2428
      continue;
×
2429
    }
UNCOV
2430
    break;
×
2431
  }
2432

2433
end:
×
UNCOV
2434
  uDebug(LOG_ID_TAG " write raw rawdata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
×
2435
  tDeleteMqDataRsp(&rspObj.dataRsp);
×
2436
  tDecoderClear(&decoder);
×
2437
  qDestroyQuery(pQuery);
×
2438
  taosHashCleanup(pVgroupHash);
×
2439
  destroyRequest(pRequest);
×
2440
  return code;
×
2441
}
2442

2443
static void processSimpleMeta(SMqMetaRsp* pMetaRsp, cJSON** meta) {
14,987✔
2444
  if (pMetaRsp == NULL || meta == NULL) {
14,987✔
2445
    uError("invalid parameter in %s", __func__);
×
2446
    return;
×
2447
  }
2448
  if (pMetaRsp->resMsgType == TDMT_VND_CREATE_STB) {
14,987✔
2449
    processCreateStb(pMetaRsp, meta);
3,775✔
2450
  } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_STB) {
11,212✔
2451
    processAlterStb(pMetaRsp, meta);
2,586✔
2452
  } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_STB) {
8,626✔
2453
    processDropSTable(pMetaRsp, meta);
400✔
2454
  } else if (pMetaRsp->resMsgType == TDMT_VND_CREATE_TABLE) {
8,226✔
2455
    processCreateTable(pMetaRsp, meta);
6,301✔
2456
  } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_TABLE) {
1,925✔
2457
    processAlterTable(pMetaRsp, meta);
1,487✔
2458
  } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_TABLE) {
438✔
2459
    processDropTable(pMetaRsp, meta);
271✔
2460
  } else if (pMetaRsp->resMsgType == TDMT_VND_DELETE) {
167✔
2461
    processDeleteTable(pMetaRsp, meta);
167✔
2462
  }
2463
}
2464

2465
static void processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp, char** string) {
948✔
2466
  if (pMsgRsp == NULL || string == NULL) {
948✔
UNCOV
2467
    uError("invalid parameter in %s", __func__);
×
UNCOV
2468
    return;
×
2469
  }
2470
  SDecoder        coder = {0};
948✔
2471
  SMqBatchMetaRsp rsp = {0};
948✔
2472
  int32_t         code = 0;
948✔
2473
  cJSON*          pJson = NULL;
948✔
2474
  tDecoderInit(&coder, pMsgRsp->pMetaBuff, pMsgRsp->metaBuffLen);
948✔
2475
  if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) {
948✔
UNCOV
2476
    goto end;
×
2477
  }
2478

2479
  pJson = cJSON_CreateObject();
948✔
2480
  RAW_NULL_CHECK(pJson);
948✔
2481
  RAW_FALSE_CHECK(cJSON_AddStringToObject(pJson, "tmq_meta_version", TMQ_META_VERSION));
948✔
2482
  cJSON* pMetaArr = cJSON_CreateArray();
948✔
2483
  RAW_NULL_CHECK(pMetaArr);
948✔
2484
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(pJson, "metas", pMetaArr));
948✔
2485

2486
  int32_t num = taosArrayGetSize(rsp.batchMetaReq);
948✔
2487
  for (int32_t i = 0; i < num; i++) {
6,482✔
2488
    int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
5,534✔
2489
    RAW_NULL_CHECK(len);
5,534✔
2490
    void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
5,534✔
2491
    RAW_NULL_CHECK(tmpBuf);
5,534✔
2492
    SDecoder   metaCoder = {0};
5,534✔
2493
    SMqMetaRsp metaRsp = {0};
5,534✔
2494
    tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead));
5,534✔
2495
    if (tDecodeMqMetaRsp(&metaCoder, &metaRsp) < 0) {
5,534✔
UNCOV
2496
      goto end;
×
2497
    }
2498
    cJSON* pItem = NULL;
5,534✔
2499
    processSimpleMeta(&metaRsp, &pItem);
5,534✔
2500
    tDeleteMqMetaRsp(&metaRsp);
5,534✔
2501
    if (pItem != NULL) RAW_FALSE_CHECK(tmqAddJsonArrayItem(pMetaArr, pItem));
5,534✔
2502
  }
2503

2504
  tDeleteMqBatchMetaRsp(&rsp);
948✔
2505
  char* fullStr = cJSON_PrintUnformatted(pJson);
948✔
2506
  cJSON_Delete(pJson);
948✔
2507
  *string = fullStr;
948✔
2508
  return;
948✔
2509

UNCOV
2510
end:
×
UNCOV
2511
  cJSON_Delete(pJson);
×
UNCOV
2512
  tDeleteMqBatchMetaRsp(&rsp);
×
2513
}
2514

2515
char* tmq_get_json_meta(TAOS_RES* res) {
10,796✔
2516
  if (res == NULL) {
10,796✔
UNCOV
2517
    uError("invalid parameter in %s", __func__);
×
UNCOV
2518
    return NULL;
×
2519
  }
2520
  uDebug("tmq_get_json_meta res:%p", res);
10,796✔
2521
  if (!TD_RES_TMQ_META(res) && !TD_RES_TMQ_METADATA(res) && !TD_RES_TMQ_BATCH_META(res)) {
10,796✔
2522
    return NULL;
×
2523
  }
2524

2525
  char*      string = NULL;
10,796✔
2526
  SMqRspObj* rspObj = (SMqRspObj*)res;
10,796✔
2527
  if (TD_RES_TMQ_METADATA(res)) {
10,796✔
2528
    processAutoCreateTable(&rspObj->dataRsp, &string);
395✔
2529
  } else if (TD_RES_TMQ_BATCH_META(res)) {
10,401✔
2530
    processBatchMetaToJson(&rspObj->batchMetaRsp, &string);
948✔
2531
  } else if (TD_RES_TMQ_META(res)) {
9,453✔
2532
    cJSON* pJson = NULL;
9,453✔
2533
    processSimpleMeta(&rspObj->metaRsp, &pJson);
9,453✔
2534
    string = cJSON_PrintUnformatted(pJson);
9,453✔
2535
    cJSON_Delete(pJson);
9,453✔
2536
  } else {
UNCOV
2537
    uError("tmq_get_json_meta res:%d, invalid type", *(int8_t*)res);
×
2538
  }
2539

2540
  uDebug("tmq_get_json_meta string:%s", string);
10,796✔
2541
  return string;
10,796✔
2542
}
2543

2544
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
12,787✔
2545

2546
static int32_t getOffSetLen(const SMqDataRsp* pRsp) {
3,617✔
2547
  if (pRsp == NULL) {
3,617✔
UNCOV
2548
    uError("invalid parameter in %s", __func__);
×
2549
    return TSDB_CODE_INVALID_PARA;
×
2550
  }
2551
  SEncoder coder = {0};
3,617✔
2552
  tEncoderInit(&coder, NULL, 0);
3,617✔
2553
  if (tEncodeSTqOffsetVal(&coder, &pRsp->reqOffset) < 0) return -1;
3,617✔
2554
  if (tEncodeSTqOffsetVal(&coder, &pRsp->rspOffset) < 0) return -1;
3,617✔
2555
  int32_t pos = coder.pos;
3,617✔
2556
  tEncoderClear(&coder);
3,617✔
2557
  return pos;
3,617✔
2558
}
2559

2560
typedef int32_t __encode_func__(SEncoder* pEncoder, const SMqDataRsp* pRsp);
2561
static int32_t  encodeMqDataRsp(__encode_func__* encodeFunc, SMqDataRsp* rspObj, tmq_raw_data* raw) {
3,617✔
2562
  if (raw == NULL || encodeFunc == NULL || rspObj == NULL) {
3,617✔
UNCOV
2563
    uError("invalid parameter in %s", __func__);
×
UNCOV
2564
    return TSDB_CODE_INVALID_PARA;
×
2565
  }
2566
  uint32_t len = 0;
3,617✔
2567
  int32_t  code = 0;
3,617✔
2568
  SEncoder encoder = {0};
3,617✔
2569
  void*    buf = NULL;
3,617✔
2570
  tEncodeSize(encodeFunc, rspObj, len, code);
3,617✔
2571
  if (code < 0) {
3,617✔
UNCOV
2572
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
2573
    goto FAILED;
×
2574
  }
2575
  len += sizeof(int8_t) + sizeof(int32_t);
3,617✔
2576
  buf = taosMemoryCalloc(1, len);
3,617✔
2577
  if (buf == NULL) {
3,617✔
UNCOV
2578
    code = terrno;
×
UNCOV
2579
    goto FAILED;
×
2580
  }
2581
  tEncoderInit(&encoder, buf, len);
3,617✔
2582
  if (tEncodeI8(&encoder, MQ_DATA_RSP_VERSION) < 0) {
3,617✔
UNCOV
2583
    code = TSDB_CODE_INVALID_MSG;
×
2584
    goto FAILED;
×
2585
  }
2586
  int32_t offsetLen = getOffSetLen(rspObj);
3,617✔
2587
  if (offsetLen <= 0) {
3,617✔
UNCOV
2588
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
2589
    goto FAILED;
×
2590
  }
2591
  if (tEncodeI32(&encoder, offsetLen) < 0) {
3,617✔
UNCOV
2592
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
2593
    goto FAILED;
×
2594
  }
2595
  if (encodeFunc(&encoder, rspObj) < 0) {
3,617✔
2596
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
2597
    goto FAILED;
×
2598
  }
2599
  tEncoderClear(&encoder);
3,617✔
2600

2601
  raw->raw = buf;
3,617✔
2602
  raw->raw_len = len;
3,617✔
2603
  return code;
3,617✔
2604
FAILED:
×
2605
  tEncoderClear(&encoder);
×
UNCOV
2606
  taosMemoryFree(buf);
×
UNCOV
2607
  return code;
×
2608
}
2609

2610
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
14,132✔
2611
  if (raw == NULL || res == NULL) {
14,132✔
UNCOV
2612
    uError("invalid parameter in %s", __func__);
×
UNCOV
2613
    return TSDB_CODE_INVALID_PARA;
×
2614
  }
2615
  *raw = (tmq_raw_data){0};
14,132✔
2616
  SMqRspObj* rspObj = ((SMqRspObj*)res);
14,132✔
2617
  if (TD_RES_TMQ_META(res)) {
14,132✔
2618
    raw->raw = rspObj->metaRsp.metaRsp;
9,780✔
2619
    raw->raw_len = rspObj->metaRsp.metaRspLen >= 0 ? rspObj->metaRsp.metaRspLen : 0;
9,780✔
2620
    raw->raw_type = rspObj->metaRsp.resMsgType;
9,780✔
2621
    uDebug("tmq get raw type meta:%p", raw);
9,780✔
2622
  } else if (TD_RES_TMQ(res)) {
4,352✔
2623
    int32_t code = encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->dataRsp, raw);
3,353✔
2624
    if (code != 0) {
3,353✔
2625
      uError("tmq get raw type error:%d", terrno);
×
UNCOV
2626
      return code;
×
2627
    }
2628
    raw->raw_type = RES_TYPE__TMQ;
3,353✔
2629
    uDebug("tmq get raw type data:%p", raw);
3,353✔
2630
  } else if (TD_RES_TMQ_METADATA(res)) {
999✔
2631
    int32_t code = encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->dataRsp, raw);
264✔
2632
    if (code != 0) {
264✔
UNCOV
2633
      uError("tmq get raw type error:%d", terrno);
×
UNCOV
2634
      return code;
×
2635
    }
2636
    raw->raw_type = RES_TYPE__TMQ_METADATA;
264✔
2637
    uDebug("tmq get raw type metadata:%p", raw);
264✔
2638
  } else if (TD_RES_TMQ_BATCH_META(res)) {
735✔
2639
    raw->raw = rspObj->batchMetaRsp.pMetaBuff;
735✔
2640
    raw->raw_len = rspObj->batchMetaRsp.metaBuffLen;
735✔
2641
    raw->raw_type = rspObj->resType;
735✔
2642
    uDebug("tmq get raw batch meta:%p", raw);
735✔
UNCOV
2643
  } else if (TD_RES_TMQ_RAW(res)) {
×
UNCOV
2644
    raw->raw = rspObj->dataRsp.rawData;
×
2645
    rspObj->dataRsp.rawData = NULL;
×
2646
    raw->raw_len = rspObj->dataRsp.len;
×
UNCOV
2647
    raw->raw_type = rspObj->resType;
×
UNCOV
2648
    uDebug("tmq get raw raw:%p", raw);
×
2649
  } else {
UNCOV
2650
    uError("tmq get raw error type:%d", *(int8_t*)res);
×
UNCOV
2651
    return TSDB_CODE_TMQ_INVALID_MSG;
×
2652
  }
2653
  return TSDB_CODE_SUCCESS;
14,132✔
2654
}
2655

2656
void tmq_free_raw(tmq_raw_data raw) {
12,722✔
2657
  uDebug("tmq free raw data type:%d", raw.raw_type);
12,722✔
2658
  if (raw.raw_type == RES_TYPE__TMQ || raw.raw_type == RES_TYPE__TMQ_METADATA) {
12,722✔
2659
    taosMemoryFree(raw.raw);
2,207✔
2660
  } else if (raw.raw_type == RES_TYPE__TMQ_RAWDATA && raw.raw != NULL) {
10,515✔
UNCOV
2661
    taosMemoryFree(POINTER_SHIFT(raw.raw, -sizeof(SMqRspHead)));
×
2662
  }
2663
  (void)memset(terrMsg, 0, ERR_MSG_LEN);
12,722✔
2664
}
12,722✔
2665

2666
static int32_t writeRawInit() {
16,856✔
2667
  while (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_START) {
17,474✔
2668
    int8_t old = atomic_val_compare_exchange_8(&initFlag, 0, 1);
618✔
2669
    if (old == 0) {
618✔
2670
      int32_t code = initRawCacheHash();
618✔
2671
      if (code != 0) {
618✔
UNCOV
2672
        uError("tmq writeRawImpl init error:%d", code);
×
2673
        atomic_store_8(&initedFlag, WRITE_RAW_INIT_FAIL);
×
UNCOV
2674
        return code;
×
2675
      }
2676
      atomic_store_8(&initedFlag, WRITE_RAW_INIT_OK);
618✔
2677
    }
2678
  }
2679

2680
  if (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_FAIL) {
16,856✔
UNCOV
2681
    return TSDB_CODE_INTERNAL_ERROR;
×
2682
  }
2683
  return 0;
16,856✔
2684
}
2685

2686
static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) {
16,856✔
2687
  if (taos == NULL || buf == NULL) {
16,856✔
UNCOV
2688
    uError("invalid parameter in %s", __func__);
×
UNCOV
2689
    return TSDB_CODE_INVALID_PARA;
×
2690
  }
2691
  if (writeRawInit() != 0) {
16,856✔
UNCOV
2692
    return TSDB_CODE_INTERNAL_ERROR;
×
2693
  }
2694

2695
  if (type == TDMT_VND_CREATE_STB) {
16,856✔
2696
    return taosCreateStb(taos, buf, len);
3,588✔
2697
  } else if (type == TDMT_VND_ALTER_STB) {
13,268✔
2698
    return taosCreateStb(taos, buf, len);
2,160✔
2699
  } else if (type == TDMT_VND_DROP_STB) {
11,108✔
2700
    return taosDropStb(taos, buf, len);
400✔
2701
  } else if (type == TDMT_VND_CREATE_TABLE) {
10,708✔
2702
    return taosCreateTable(taos, buf, len);
5,900✔
2703
  } else if (type == TDMT_VND_ALTER_TABLE) {
4,808✔
2704
    return taosAlterTable(taos, buf, len);
1,428✔
2705
  } else if (type == TDMT_VND_DROP_TABLE) {
3,380✔
2706
    return taosDropTable(taos, buf, len);
271✔
2707
  } else if (type == TDMT_VND_DELETE) {
3,109✔
2708
    return taosDeleteData(taos, buf, len);
167✔
2709
  } else if (type == RES_TYPE__TMQ_METADATA) {
2,942✔
2710
    return tmqWriteRawMetaDataImpl(taos, buf, len);
264✔
2711
  } else if (type == RES_TYPE__TMQ_RAWDATA) {
2,678✔
UNCOV
2712
    return tmqWriteRawRawDataImpl(taos, buf, len);
×
2713
  } else if (type == RES_TYPE__TMQ) {
2,678✔
2714
    return tmqWriteRawDataImpl(taos, buf, len);
1,943✔
2715
  } else if (type == RES_TYPE__TMQ_BATCH_META) {
735✔
2716
    return tmqWriteBatchMetaDataImpl(taos, buf, len);
735✔
2717
  }
UNCOV
2718
  return TSDB_CODE_INVALID_PARA;
×
2719
}
2720

2721
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
12,156✔
2722
  if (taos == NULL || raw.raw == NULL || raw.raw_len <= 0) {
12,156✔
2723
    SET_ERROR_MSG("taos:%p or data:%p is NULL or raw_len <= 0", taos, raw.raw);
621✔
2724
    return TSDB_CODE_INVALID_PARA;
621✔
2725
  }
2726
  taosClearErrMsg();  // clear global error message
11,535✔
2727

2728
  return writeRawImpl(taos, raw.raw, raw.raw_len, raw.raw_type);
11,535✔
2729
}
2730

2731
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen) {
735✔
2732
  if (taos == NULL || meta == NULL) {
735✔
UNCOV
2733
    uError("invalid parameter in %s", __func__);
×
UNCOV
2734
    return TSDB_CODE_INVALID_PARA;
×
2735
  }
2736
  SMqBatchMetaRsp rsp = {0};
735✔
2737
  SDecoder        coder = {0};
735✔
2738
  int32_t         code = TSDB_CODE_SUCCESS;
735✔
2739

2740
  // decode and process req
2741
  tDecoderInit(&coder, meta, metaLen);
735✔
2742
  if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) {
735✔
UNCOV
2743
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
2744
    goto end;
×
2745
  }
2746
  int32_t num = taosArrayGetSize(rsp.batchMetaReq);
735✔
2747
  for (int32_t i = 0; i < num; i++) {
6,056✔
2748
    int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
5,321✔
2749
    RAW_NULL_CHECK(len);
5,321✔
2750
    void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
5,321✔
2751
    RAW_NULL_CHECK(tmpBuf);
5,321✔
2752
    SDecoder   metaCoder = {0};
5,321✔
2753
    SMqMetaRsp metaRsp = {0};
5,321✔
2754
    tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead));
5,321✔
2755
    if (tDecodeMqMetaRsp(&metaCoder, &metaRsp) < 0) {
5,321✔
2756
      code = TSDB_CODE_INVALID_PARA;
×
UNCOV
2757
      goto end;
×
2758
    }
2759
    code = writeRawImpl(taos, metaRsp.metaRsp, metaRsp.metaRspLen, metaRsp.resMsgType);
5,321✔
2760
    tDeleteMqMetaRsp(&metaRsp);
5,321✔
2761
    if (code != TSDB_CODE_SUCCESS) {
5,321✔
UNCOV
2762
      goto end;
×
2763
    }
2764
  }
2765

2766
end:
735✔
2767
  tDeleteMqBatchMetaRsp(&rsp);
735✔
2768
  return code;
735✔
2769
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc