• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3620

21 Feb 2025 09:00AM UTC coverage: 63.573% (+0.2%) from 63.423%
#3620

push

travis-ci

web-flow
ci: taosBenchmark add coverage cases branch 3.0 (#29788)

* fix: add unit test for taos-tools

* fix: only .cpp include

* fix: remove no use function

* fix: restore toolsSys.c

* fix: add toolsSys case

* fix: rebuild error fixed

* fix: fix build error

* fix: support get vgroups with core and memory limit

* fix: build error for strcasecmp

* fix: add insertBasic.py case

* fix: add command line set vgroups=3

* fix: change with ns database

* toolscJson read with int replace float and add insertPrecison.py

* fix: add insertBindVGroup.json case

* fix: remove public fun removeQuotation

* fix: vgroups change method

* fix: memory leak for runInsertLimitThread slot

* insertPrecision.py word write wrong

* fix: check isFloat number

* fix: vgroups change logic error

* fix: insertBasic.py real and expect error

* fix: adjust default vgroups

* fix: adjust default vgroups modify comment

148962 of 300203 branches covered (49.62%)

Branch coverage included in aggregate %.

15 of 16 new or added lines in 1 file covered. (93.75%)

2018 existing lines in 133 files now uncovered.

233201 of 300933 relevant lines covered (77.49%)

18174406.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.04
/source/client/src/clientRawBlockWrite.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "parser.h"
19
#include "tcol.h"
20
#include "tcompression.h"
21
#include "tdatablock.h"
22
#include "tdef.h"
23
#include "tglobal.h"
24
#include "tmsgtype.h"
25

26
#define RAW_NULL_CHECK(c) \
27
  do {                    \
28
    if (c == NULL) {      \
29
      code = terrno;      \
30
      goto end;           \
31
    }                     \
32
  } while (0)
33

34
#define RAW_FALSE_CHECK(c)           \
35
  do {                               \
36
    if (!c) {                        \
37
      code = TSDB_CODE_INVALID_PARA; \
38
      goto end;                      \
39
    }                                \
40
  } while (0)
41

42
#define RAW_RETURN_CHECK(c) \
43
  do {                      \
44
    code = c;               \
45
    if (code != 0) {        \
46
      goto end;             \
47
    }                       \
48
  } while (0)
49

50
#define LOG_ID_TAG   "connId:0x%" PRIx64 ",QID:0x%" PRIx64
51
#define LOG_ID_VALUE *(int64_t*)taos, pRequest->requestId
52

53
#define TMQ_META_VERSION "1.0"
54

55
static bool  tmqAddJsonObjectItem(cJSON *object, const char *string, cJSON *item){
27,311✔
56
  bool ret = cJSON_AddItemToObject(object, string, item);
27,311✔
57
  if (!ret){
27,311!
58
    cJSON_Delete(item);
×
59
  }
60
  return ret;
27,311✔
61
}
62
static bool  tmqAddJsonArrayItem(cJSON *array, cJSON *item){
7,560✔
63
  bool ret = cJSON_AddItemToArray(array, item);
7,560✔
64
  if (!ret){
7,560!
65
    cJSON_Delete(item);
×
66
  }
67
  return ret;
7,560✔
68
}
69

70

71
static int32_t  tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen);
72
static tb_uid_t processSuid(tb_uid_t suid, char* db) {
146✔
73
  if (db == NULL) {
146!
74
    return suid;
×
75
  }
76
  return suid + MurmurHash3_32(db, strlen(db));
146✔
77
}
78
static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t,
111✔
79
                                 SColCmprWrapper* pColCmprRow, cJSON** pJson) {
80
  if (schemaRow == NULL || name == NULL || pColCmprRow == NULL || pJson == NULL) {
111!
81
    uError("invalid parameter, schemaRow:%p, name:%p, pColCmprRow:%p, pJson:%p", schemaRow, name, pColCmprRow, pJson);
×
82
    return;
×
83
  }
84
  int32_t code = TSDB_CODE_SUCCESS;
111✔
85
  int8_t  buildDefaultCompress = 0;
111✔
86
  if (pColCmprRow->nCols <= 0) {
111!
87
    buildDefaultCompress = 1;
×
88
  }
89

90
  char*  string = NULL;
111✔
91
  cJSON* json = cJSON_CreateObject();
111✔
92
  RAW_NULL_CHECK(json);
111!
93
  cJSON* type = cJSON_CreateString("create");
111✔
94
  RAW_NULL_CHECK(type);
111!
95

96
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
111!
97
  cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super");
111✔
98
  RAW_NULL_CHECK(tableType);
111!
99
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
111!
100
  cJSON* tableName = cJSON_CreateString(name);
111✔
101
  RAW_NULL_CHECK(tableName);
111!
102
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
111!
103

104
  cJSON* columns = cJSON_CreateArray();
111✔
105
  RAW_NULL_CHECK(columns);
111!
106
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "columns", columns));
111!
107

108
  for (int i = 0; i < schemaRow->nCols; i++) {
651✔
109
    cJSON* column = cJSON_CreateObject();
540✔
110
    RAW_NULL_CHECK(column);
540!
111
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(columns, column));
540!
112
    SSchema* s = schemaRow->pSchema + i;
540✔
113
    cJSON*   cname = cJSON_CreateString(s->name);
540✔
114
    RAW_NULL_CHECK(cname);
540!
115
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "name", cname));
540!
116
    cJSON* ctype = cJSON_CreateNumber(s->type);
540✔
117
    RAW_NULL_CHECK(ctype);
540!
118
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "type", ctype));
540!
119
    if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) {
608!
120
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
68✔
121
      cJSON*  cbytes = cJSON_CreateNumber(length);
68✔
122
      RAW_NULL_CHECK(cbytes);
68!
123
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "length", cbytes));
68!
124
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
472✔
125
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
9✔
126
      cJSON*  cbytes = cJSON_CreateNumber(length);
9✔
127
      RAW_NULL_CHECK(cbytes);
9!
128
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "length", cbytes));
9!
129
    }
130
    cJSON* isPk = cJSON_CreateBool(s->flags & COL_IS_KEY);
540✔
131
    RAW_NULL_CHECK(isPk);
540!
132
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "isPrimarykey", isPk));
540!
133

134
    if (pColCmprRow == NULL) {
540!
135
      continue;
×
136
    }
137

138
    uint32_t alg = 0;
540✔
139
    if (buildDefaultCompress) {
540!
140
      alg = createDefaultColCmprByType(s->type);
×
141
    } else {
142
      SColCmpr* pColCmpr = pColCmprRow->pColCmpr + i;
540✔
143
      alg = pColCmpr->alg;
540✔
144
    }
145
    const char* encode = columnEncodeStr(COMPRESS_L1_TYPE_U32(alg));
540✔
146
    RAW_NULL_CHECK(encode);
540!
147
    const char* compress = columnCompressStr(COMPRESS_L2_TYPE_U32(alg));
540✔
148
    RAW_NULL_CHECK(compress);
540!
149
    const char* level = columnLevelStr(COMPRESS_L2_TYPE_LEVEL_U32(alg));
540✔
150
    RAW_NULL_CHECK(level);
540!
151

152
    cJSON* encodeJson = cJSON_CreateString(encode);
540✔
153
    RAW_NULL_CHECK(encodeJson);
540!
154
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "encode", encodeJson));
540!
155

156
    cJSON* compressJson = cJSON_CreateString(compress);
540✔
157
    RAW_NULL_CHECK(compressJson);
540!
158
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "compress", compressJson));
540!
159

160
    cJSON* levelJson = cJSON_CreateString(level);
540✔
161
    RAW_NULL_CHECK(levelJson);
540!
162
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "level", levelJson));
540!
163
  }
164

165
  cJSON* tags = cJSON_CreateArray();
111✔
166
  RAW_NULL_CHECK(tags);
111!
167
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags));
111!
168

169
  for (int i = 0; schemaTag && i < schemaTag->nCols; i++) {
338✔
170
    cJSON* tag = cJSON_CreateObject();
227✔
171
    RAW_NULL_CHECK(tag);
227!
172
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag));
227!
173
    SSchema* s = schemaTag->pSchema + i;
227✔
174
    cJSON*   tname = cJSON_CreateString(s->name);
227✔
175
    RAW_NULL_CHECK(tname);
227!
176
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname));
227!
177
    cJSON* ttype = cJSON_CreateNumber(s->type);
227✔
178
    RAW_NULL_CHECK(ttype);
227!
179
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype));
227!
180
    if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) {
235!
181
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
8✔
182
      cJSON*  cbytes = cJSON_CreateNumber(length);
8✔
183
      RAW_NULL_CHECK(cbytes);
8!
184
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "length", cbytes));
8!
185
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
219✔
186
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
65✔
187
      cJSON*  cbytes = cJSON_CreateNumber(length);
65✔
188
      RAW_NULL_CHECK(cbytes);
65!
189
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "length", cbytes));
65!
190
    }
191
  }
192

193
end:
111✔
194
  *pJson = json;
111✔
195
}
196

197
static int32_t setCompressOption(cJSON* json, uint32_t para) {
×
198
  if (json == NULL) {
×
199
    return TSDB_CODE_INVALID_PARA;
×
200
  }
201
  uint8_t encode = COMPRESS_L1_TYPE_U32(para);
×
202
  int32_t code = 0;
×
203
  if (encode != 0) {
×
204
    const char* encodeStr = columnEncodeStr(encode);
×
205
    RAW_NULL_CHECK(encodeStr);
×
206
    cJSON* encodeJson = cJSON_CreateString(encodeStr);
×
207
    RAW_NULL_CHECK(encodeJson);
×
208
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "encode", encodeJson));
×
209
    return code;
×
210
  }
211
  uint8_t compress = COMPRESS_L2_TYPE_U32(para);
×
212
  if (compress != 0) {
×
213
    const char* compressStr = columnCompressStr(compress);
×
214
    RAW_NULL_CHECK(compressStr);
×
215
    cJSON* compressJson = cJSON_CreateString(compressStr);
×
216
    RAW_NULL_CHECK(compressJson);
×
217
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "compress", compressJson));
×
218
    return code;
×
219
  }
220
  uint8_t level = COMPRESS_L2_TYPE_LEVEL_U32(para);
×
221
  if (level != 0) {
×
222
    const char* levelStr = columnLevelStr(level);
×
223
    RAW_NULL_CHECK(levelStr);
×
224
    cJSON* levelJson = cJSON_CreateString(levelStr);
×
225
    RAW_NULL_CHECK(levelJson);
×
226
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "level", levelJson));
×
227
    return code;
×
228
  }
229

230
end:
×
231
  return code;
×
232
}
233
static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON** pJson) {
55✔
234
  if (alterData == NULL || pJson == NULL) {
55!
235
    uError("invalid parameter in %s", __func__);
×
236
    return;
×
237
  }
238
  SMAlterStbReq req = {0};
55✔
239
  cJSON*        json = NULL;
55✔
240
  char*         string = NULL;
55✔
241
  int32_t       code = 0;
55✔
242

243
  if (tDeserializeSMAlterStbReq(alterData, alterDataLen, &req) != 0) {
55!
244
    goto end;
×
245
  }
246

247
  json = cJSON_CreateObject();
55✔
248
  RAW_NULL_CHECK(json);
55!
249
  cJSON* type = cJSON_CreateString("alter");
55✔
250
  RAW_NULL_CHECK(type);
55!
251
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
55!
252
  SName name = {0};
55✔
253
  RAW_RETURN_CHECK(tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
55!
254
  cJSON* tableType = cJSON_CreateString("super");
55✔
255
  RAW_NULL_CHECK(tableType);
55!
256
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
55!
257
  cJSON* tableName = cJSON_CreateString(name.tname);
55✔
258
  RAW_NULL_CHECK(tableName);
55!
259
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
55!
260

261
  cJSON* alterType = cJSON_CreateNumber(req.alterType);
55✔
262
  RAW_NULL_CHECK(alterType);
55!
263
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "alterType", alterType));
55!
264
  switch (req.alterType) {
55!
265
    case TSDB_ALTER_TABLE_ADD_TAG:
33✔
266
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
267
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
33✔
268
      RAW_NULL_CHECK(field);
33!
269
      cJSON* colName = cJSON_CreateString(field->name);
33✔
270
      RAW_NULL_CHECK(colName);
33!
271
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
33!
272
      cJSON* colType = cJSON_CreateNumber(field->type);
33✔
273
      RAW_NULL_CHECK(colType);
33!
274
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
33!
275

276
      if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
33!
277
          field->type == TSDB_DATA_TYPE_GEOMETRY) {
33!
278
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
11✔
279
        cJSON*  cbytes = cJSON_CreateNumber(length);
11✔
280
        RAW_NULL_CHECK(cbytes);
11!
281
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
11!
282
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
22!
283
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
284
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
285
        RAW_NULL_CHECK(cbytes);
×
286
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
287
      }
288
      break;
33✔
289
    }
290
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: {
×
291
      SFieldWithOptions* field = taosArrayGet(req.pFields, 0);
×
292
      RAW_NULL_CHECK(field);
×
293
      cJSON* colName = cJSON_CreateString(field->name);
×
294
      RAW_NULL_CHECK(colName);
×
295
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
296
      cJSON* colType = cJSON_CreateNumber(field->type);
×
297
      RAW_NULL_CHECK(colType);
×
298
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
×
299

300
      if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
×
301
          field->type == TSDB_DATA_TYPE_GEOMETRY) {
×
302
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
×
303
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
304
        RAW_NULL_CHECK(cbytes);
×
305
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
306
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
×
307
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
308
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
309
        RAW_NULL_CHECK(cbytes);
×
310
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
311
      }
312
      RAW_RETURN_CHECK(setCompressOption(json, field->compress));
×
313
      break;
×
314
    }
315
    case TSDB_ALTER_TABLE_DROP_TAG:
11✔
316
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
317
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
11✔
318
      RAW_NULL_CHECK(field);
11!
319
      cJSON* colName = cJSON_CreateString(field->name);
11✔
320
      RAW_NULL_CHECK(colName);
11!
321
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
11!
322
      break;
11✔
323
    }
324
    case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
11✔
325
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
326
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
11✔
327
      RAW_NULL_CHECK(field);
11!
328
      cJSON* colName = cJSON_CreateString(field->name);
11✔
329
      RAW_NULL_CHECK(colName);
11!
330
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
11!
331
      cJSON* colType = cJSON_CreateNumber(field->type);
11✔
332
      RAW_NULL_CHECK(colType);
11!
333
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
11!
334
      if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
11!
335
          field->type == TSDB_DATA_TYPE_GEOMETRY) {
11!
336
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
11✔
337
        cJSON*  cbytes = cJSON_CreateNumber(length);
11✔
338
        RAW_NULL_CHECK(cbytes);
11!
339
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
11!
340
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
×
341
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
342
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
343
        RAW_NULL_CHECK(cbytes);
×
344
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
345
      }
346
      break;
11✔
347
    }
348
    case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
×
349
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
350
      TAOS_FIELD* oldField = taosArrayGet(req.pFields, 0);
×
351
      RAW_NULL_CHECK(oldField);
×
352
      TAOS_FIELD* newField = taosArrayGet(req.pFields, 1);
×
353
      RAW_NULL_CHECK(newField);
×
354
      cJSON* colName = cJSON_CreateString(oldField->name);
×
355
      RAW_NULL_CHECK(colName);
×
356
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
357
      cJSON* colNewName = cJSON_CreateString(newField->name);
×
358
      RAW_NULL_CHECK(colNewName);
×
359
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colNewName", colNewName));
×
360
      break;
×
361
    }
362
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
×
363
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
×
364
      RAW_NULL_CHECK(field);
×
365
      cJSON* colName = cJSON_CreateString(field->name);
×
366
      RAW_NULL_CHECK(colName);
×
367
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
368
      RAW_RETURN_CHECK(setCompressOption(json, field->bytes));
×
369
      break;
×
370
    }
371
    default:
×
372
      break;
×
373
  }
374

375
end:
55✔
376
  tFreeSMAltertbReq(&req);
55✔
377
  *pJson = json;
55✔
378
}
379

380
static void processCreateStb(SMqMetaRsp* metaRsp, cJSON** pJson) {
91✔
381
  if (metaRsp == NULL || pJson == NULL) {
91!
382
    uError("invalid parameter in %s", __func__);
×
383
    return;
×
384
  }
385
  SVCreateStbReq req = {0};
91✔
386
  SDecoder       coder = {0};
91✔
387

388
  uDebug("create stable data:%p", metaRsp);
91!
389
  // decode and process req
390
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
91✔
391
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
91✔
392
  tDecoderInit(&coder, data, len);
91✔
393

394
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
91!
395
    goto end;
×
396
  }
397
  buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE, &req.colCmpr, pJson);
91✔
398

399
end:
91✔
400
  uDebug("create stable return");
91!
401
  tDecoderClear(&coder);
91✔
402
}
403

404
static void processAlterStb(SMqMetaRsp* metaRsp, cJSON** pJson) {
55✔
405
  if (metaRsp == NULL || pJson == NULL) {
55!
406
    uError("invalid parameter in %s", __func__);
×
407
    return;
×
408
  }
409
  SVCreateStbReq req = {0};
55✔
410
  SDecoder       coder = {0};
55✔
411
  uDebug("alter stable data:%p", metaRsp);
55!
412

413
  // decode and process req
414
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
55✔
415
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
55✔
416
  tDecoderInit(&coder, data, len);
55✔
417

418
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
55!
419
    goto end;
×
420
  }
421
  buildAlterSTableJson(req.alterOriData, req.alterOriDataLen, pJson);
55✔
422

423
end:
55✔
424
  uDebug("alter stable return");
55!
425
  tDecoderClear(&coder);
55✔
426
}
427

428
static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
1,225✔
429
  if (json == NULL || pCreateReq == NULL) {
1,225!
430
    uError("invalid parameter in %s", __func__);
×
431
    return;
×
432
  }
433
  STag*   pTag = (STag*)pCreateReq->ctb.pTag;
1,225✔
434
  char*   sname = pCreateReq->ctb.stbName;
1,225✔
435
  char*   name = pCreateReq->name;
1,225✔
436
  SArray* tagName = pCreateReq->ctb.tagName;
1,225✔
437
  int64_t id = pCreateReq->uid;
1,225✔
438
  uint8_t tagNum = pCreateReq->ctb.tagNum;
1,225✔
439
  int32_t code = 0;
1,225✔
440
  SArray* pTagVals = NULL;
1,225✔
441
  char*   pJson = NULL;
1,225✔
442

443
  cJSON*  tableName = cJSON_CreateString(name);
1,225✔
444
  RAW_NULL_CHECK(tableName);
1,225!
445
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
1,225!
446
  cJSON* using = cJSON_CreateString(sname);
1,225✔
447
  RAW_NULL_CHECK(using);
1,225!
448
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "using", using));
1,225!
449
  cJSON* tagNumJson = cJSON_CreateNumber(tagNum);
1,225✔
450
  RAW_NULL_CHECK(tagNumJson);
1,225!
451
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tagNum", tagNumJson));
1,225!
452

453
  cJSON* tags = cJSON_CreateArray();
1,225✔
454
  RAW_NULL_CHECK(tags);
1,225!
455
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags));
1,225!
456
  RAW_RETURN_CHECK(tTagToValArray(pTag, &pTagVals));
1,225!
457
  if (tTagIsJson(pTag)) {
1,225✔
458
    STag* p = (STag*)pTag;
22✔
459
    if (p->nTag == 0) {
22✔
460
      uError("p->nTag == 0");
11!
461
      goto end;
11✔
462
    }
463
    parseTagDatatoJson(pTag, &pJson, NULL);
11✔
464
    RAW_NULL_CHECK(pJson);
11!
465
    cJSON* tag = cJSON_CreateObject();
11✔
466
    RAW_NULL_CHECK(tag);
11!
467
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag));
11!
468
    STagVal* pTagVal = taosArrayGet(pTagVals, 0);
11✔
469
    RAW_NULL_CHECK(pTagVal);
11!
470
    char* ptname = taosArrayGet(tagName, 0);
11✔
471
    RAW_NULL_CHECK(ptname);
11!
472
    cJSON* tname = cJSON_CreateString(ptname);
11✔
473
    RAW_NULL_CHECK(tname);
11!
474
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname));
11!
475
    cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON);
11✔
476
    RAW_NULL_CHECK(ttype);
11!
477
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype));
11!
478
    cJSON* tvalue = cJSON_CreateString(pJson);
11✔
479
    RAW_NULL_CHECK(tvalue);
11!
480
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "value", tvalue));
11!
481
    goto end;
11✔
482
  }
483

484
  for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
6,814✔
485
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
5,611✔
486
    RAW_NULL_CHECK(pTagVal);
5,611!
487
    cJSON* tag = cJSON_CreateObject();
5,611✔
488
    RAW_NULL_CHECK(tag);
5,611!
489
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag));
5,611!
490
    char* ptname = taosArrayGet(tagName, i);
5,611✔
491
    RAW_NULL_CHECK(ptname);
5,611!
492
    cJSON* tname = cJSON_CreateString(ptname);
5,611✔
493
    RAW_NULL_CHECK(tname);
5,611!
494
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname));
5,611!
495
    cJSON* ttype = cJSON_CreateNumber(pTagVal->type);
5,611✔
496
    RAW_NULL_CHECK(ttype);
5,611!
497
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype));
5,611!
498

499
    cJSON* tvalue = NULL;
5,611✔
500
    if (IS_VAR_DATA_TYPE(pTagVal->type)) {
7,821!
501
      int64_t bufSize = 0;
2,210✔
502
      if (pTagVal->type == TSDB_DATA_TYPE_VARBINARY) {
2,210!
503
        bufSize = pTagVal->nData * 2 + 2 + 3;
×
504
      } else {
505
        bufSize = pTagVal->nData + 3;
2,210✔
506
      }
507
      char* buf = taosMemoryCalloc(bufSize, 1);
2,210!
508
      RAW_NULL_CHECK(buf);
2,210!
509
      if (dataConverToStr(buf, bufSize, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL) != TSDB_CODE_SUCCESS) {
2,210!
510
        taosMemoryFree(buf);
×
511
        goto end;
×
512
      }
513

514
      tvalue = cJSON_CreateString(buf);
2,210✔
515
      taosMemoryFree(buf);
2,210!
516
      RAW_NULL_CHECK(tvalue);
2,210!
517
    } else {
518
      double val = 0;
3,401✔
519
      GET_TYPED_DATA(val, double, pTagVal->type, &pTagVal->i64);
3,401!
520
      tvalue = cJSON_CreateNumber(val);
3,401✔
521
      RAW_NULL_CHECK(tvalue);
3,401!
522
    }
523

524
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "value", tvalue));
5,611!
525
  }
526

527
end:
1,203✔
528
  taosMemoryFree(pJson);
1,225!
529
  taosArrayDestroy(pTagVals);
1,225✔
530
}
531

532
static void buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs, cJSON** pJson) {
189✔
533
  if (pJson == NULL || pCreateReq == NULL) {
189!
534
    uError("invalid parameter in %s", __func__);
×
535
    return;
×
536
  }
537
  int32_t code = 0;
189✔
538
  char*   string = NULL;
189✔
539
  cJSON*  json = cJSON_CreateObject();
189✔
540
  RAW_NULL_CHECK(json);
189!
541
  cJSON* type = cJSON_CreateString("create");
189✔
542
  RAW_NULL_CHECK(type);
189!
543
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
189!
544

545
  cJSON* tableType = cJSON_CreateString("child");
189✔
546
  RAW_NULL_CHECK(tableType);
189!
547
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
189!
548

549
  buildChildElement(json, pCreateReq);
189✔
550
  cJSON* createList = cJSON_CreateArray();
189✔
551
  RAW_NULL_CHECK(createList);
189!
552
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "createList", createList));
189!
553

554
  for (int i = 0; nReqs > 1 && i < nReqs; i++) {
1,225✔
555
    cJSON* create = cJSON_CreateObject();
1,036✔
556
    RAW_NULL_CHECK(create);
1,036!
557
    buildChildElement(create, pCreateReq + i);
1,036✔
558
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(createList, create));
1,036!
559
  }
560

561
end:
189✔
562
  *pJson = json;
189✔
563
}
564

565
static void processCreateTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
141✔
566
  if (pJson == NULL || metaRsp == NULL) {
141!
567
    uError("invalid parameter in %s", __func__);
×
568
    return;
×
569
  }
570
  SDecoder           decoder = {0};
141✔
571
  SVCreateTbBatchReq req = {0};
141✔
572
  SVCreateTbReq*     pCreateReq;
573
  // decode
574
  uDebug("create table data:%p", metaRsp);
141!
575
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
141✔
576
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
141✔
577
  tDecoderInit(&decoder, data, len);
141✔
578
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
141!
579
    goto end;
×
580
  }
581

582
  // loop to create table
583
  if (req.nReqs > 0) {
141!
584
    pCreateReq = req.pReqs;
141✔
585
    if (pCreateReq->type == TSDB_CHILD_TABLE) {
141✔
586
      buildCreateCTableJson(req.pReqs, req.nReqs, pJson);
121✔
587
    } else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
20!
588
      buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE,
20✔
589
                           &pCreateReq->colCmpr, pJson);
590
    }
591
  }
592

593
end:
×
594
  uDebug("create table return");
141!
595
  tDeleteSVCreateTbBatchReq(&req);
141✔
596
  tDecoderClear(&decoder);
141✔
597
}
598

599
static void processAutoCreateTable(SMqDataRsp* rsp, char** string) {
68✔
600
  if (rsp == NULL || string == NULL) {
68!
601
    uError("invalid parameter in %s", __func__);
×
602
    return;
×
603
  }
604
  SDecoder*      decoder = NULL;
68✔
605
  SVCreateTbReq* pCreateReq = NULL;
68✔
606
  int32_t        code = 0;
68✔
607
  uDebug("auto create table data:%p", rsp);
68!
608
  if (rsp->createTableNum <= 0) {
68!
609
    uError("processAutoCreateTable rsp->createTableNum <= 0");
×
610
    goto end;
×
611
  }
612

613
  decoder = taosMemoryCalloc(rsp->createTableNum, sizeof(SDecoder));
68!
614
  RAW_NULL_CHECK(decoder);
68!
615
  pCreateReq = taosMemoryCalloc(rsp->createTableNum, sizeof(SVCreateTbReq));
68!
616
  RAW_NULL_CHECK(pCreateReq);
68!
617

618
  // loop to create table
619
  for (int32_t iReq = 0; iReq < rsp->createTableNum; iReq++) {
1,111✔
620
    // decode
621
    void** data = taosArrayGet(rsp->createTableReq, iReq);
1,043✔
622
    RAW_NULL_CHECK(data);
1,043!
623
    int32_t* len = taosArrayGet(rsp->createTableLen, iReq);
1,043✔
624
    RAW_NULL_CHECK(len);
1,043!
625
    tDecoderInit(&decoder[iReq], *data, *len);
1,043✔
626
    if (tDecodeSVCreateTbReq(&decoder[iReq], pCreateReq + iReq) < 0) {
1,043!
627
      goto end;
×
628
    }
629

630
    if (pCreateReq[iReq].type != TSDB_CHILD_TABLE) {
1,043!
631
      uError("processAutoCreateTable pCreateReq[iReq].type != TSDB_CHILD_TABLE");
×
632
      goto end;
×
633
    }
634
  }
635
  cJSON* pJson = NULL;
68✔
636
  buildCreateCTableJson(pCreateReq, rsp->createTableNum, &pJson);
68✔
637
  *string = cJSON_PrintUnformatted(pJson);
68✔
638
  cJSON_Delete(pJson);
68✔
639

640
end:
68✔
641
  uDebug("auto created table return, sql json:%s", *string);
68!
642
  for (int i = 0; decoder && pCreateReq && i < rsp->createTableNum; i++) {
1,111!
643
    tDecoderClear(&decoder[i]);
1,043✔
644
    taosMemoryFreeClear(pCreateReq[i].comment);
1,043!
645
    if (pCreateReq[i].type == TSDB_CHILD_TABLE) {
1,043!
646
      taosArrayDestroy(pCreateReq[i].ctb.tagName);
1,043✔
647
    }
648
  }
649
  taosMemoryFree(decoder);
68!
650
  taosMemoryFree(pCreateReq);
68!
651
}
652

653
static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
31✔
654
  if (pJson == NULL || metaRsp == NULL) {
31!
655
    uError("invalid parameter in %s", __func__);
×
656
    return;
×
657
  }
658
  SDecoder     decoder = {0};
31✔
659
  SVAlterTbReq vAlterTbReq = {0};
31✔
660
  char*        string = NULL;
31✔
661
  cJSON*       json = NULL;
31✔
662
  int32_t      code = 0;
31✔
663

664
  uDebug("alter table data:%p", metaRsp);
31!
665
  // decode
666
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
31✔
667
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
31✔
668
  tDecoderInit(&decoder, data, len);
31✔
669
  if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) {
31!
670
    uError("tDecodeSVAlterTbReq error");
×
671
    goto end;
×
672
  }
673

674
  json = cJSON_CreateObject();
31✔
675
  RAW_NULL_CHECK(json);
31!
676
  cJSON* type = cJSON_CreateString("alter");
31✔
677
  RAW_NULL_CHECK(type);
31!
678
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
31!
679
  cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ||
56✔
680
                                                vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL
25!
681
                                            ? "child"
682
                                            : "normal");
683
  RAW_NULL_CHECK(tableType);
31!
684
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
31!
685
  cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
31✔
686
  RAW_NULL_CHECK(tableName);
31!
687
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
31!
688
  cJSON* alterType = cJSON_CreateNumber(vAlterTbReq.action);
31✔
689
  RAW_NULL_CHECK(alterType);
31!
690
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "alterType", alterType));
31!
691

692
  switch (vAlterTbReq.action) {
31!
693
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
5✔
694
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
5✔
695
      RAW_NULL_CHECK(colName);
5!
696
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
5!
697
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
5✔
698
      RAW_NULL_CHECK(colType);
5!
699
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
5!
700

701
      if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY ||
5!
702
          vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) {
5!
703
        int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
×
704
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
705
        RAW_NULL_CHECK(cbytes);
×
706
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
707
      } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
5!
708
        int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
709
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
710
        RAW_NULL_CHECK(cbytes);
×
711
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
712
      }
713
      break;
5✔
714
    }
715
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: {
×
716
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
×
717
      RAW_NULL_CHECK(colName);
×
718
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
719
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
×
720
      RAW_NULL_CHECK(colType);
×
721
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
×
722

723
      if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY ||
×
724
          vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) {
×
725
        int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
×
726
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
727
        RAW_NULL_CHECK(cbytes);
×
728
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
729
      } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
×
730
        int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
731
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
732
        RAW_NULL_CHECK(cbytes);
×
733
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
734
      }
735
      RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
×
736
      break;
×
737
    }
738
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
5✔
739
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
5✔
740
      RAW_NULL_CHECK(colName);
5!
741
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
5!
742
      break;
5✔
743
    }
744
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
5✔
745
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
5✔
746
      RAW_NULL_CHECK(colName);
5!
747
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
5!
748
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.colModType);
5✔
749
      RAW_NULL_CHECK(colType);
5!
750
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
5!
751
      if (vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY || vAlterTbReq.colModType == TSDB_DATA_TYPE_VARBINARY ||
5!
752
          vAlterTbReq.colModType == TSDB_DATA_TYPE_GEOMETRY) {
5!
753
        int32_t length = vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE;
×
754
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
755
        RAW_NULL_CHECK(cbytes);
×
756
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
757
      } else if (vAlterTbReq.colModType == TSDB_DATA_TYPE_NCHAR) {
5!
758
        int32_t length = (vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
5✔
759
        cJSON*  cbytes = cJSON_CreateNumber(length);
5✔
760
        RAW_NULL_CHECK(cbytes);
5!
761
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
5!
762
      }
763
      break;
5✔
764
    }
765
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
5✔
766
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
5✔
767
      RAW_NULL_CHECK(colName);
5!
768
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
5!
769
      cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName);
5✔
770
      RAW_NULL_CHECK(colNewName);
5!
771
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colNewName", colNewName));
5!
772
      break;
5✔
773
    }
774
    case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: {
6✔
775
      cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName);
6✔
776
      RAW_NULL_CHECK(tagName);
6!
777
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", tagName));
6!
778

779
      bool isNull = vAlterTbReq.isNull;
6✔
780
      if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
6!
781
        STag* jsonTag = (STag*)vAlterTbReq.pTagVal;
×
782
        if (jsonTag->nTag == 0) isNull = true;
×
783
      }
784
      if (!isNull) {
6!
785
        char* buf = NULL;
6✔
786

787
        if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
6!
788
          if (!tTagIsJson(vAlterTbReq.pTagVal)) {
×
789
            uError("processAlterTable isJson false");
×
790
            goto end;
×
791
          }
792
          parseTagDatatoJson(vAlterTbReq.pTagVal, &buf, NULL);
×
793
          if (buf == NULL) {
×
794
            uError("parseTagDatatoJson failed, buf == NULL");
×
795
            goto end;
×
796
          }
797
        } else {
798
          int64_t bufSize = 0;
6✔
799
          if (vAlterTbReq.tagType == TSDB_DATA_TYPE_VARBINARY) {
6!
800
            bufSize = vAlterTbReq.nTagVal * 2 + 2 + 3;
×
801
          } else {
802
            bufSize = vAlterTbReq.nTagVal + 32;
6✔
803
          }
804
          buf = taosMemoryCalloc(bufSize, 1);
6!
805
          RAW_NULL_CHECK(buf);
6!
806
          if (dataConverToStr(buf, bufSize, vAlterTbReq.tagType, vAlterTbReq.pTagVal, vAlterTbReq.nTagVal, NULL) !=
6!
807
              TSDB_CODE_SUCCESS) {
808
            taosMemoryFree(buf);
×
809
            goto end;
×
810
          }
811
        }
812

813
        cJSON* colValue = cJSON_CreateString(buf);
6✔
814
        taosMemoryFree(buf);
6!
815
        RAW_NULL_CHECK(colValue);
6!
816
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colValue", colValue));
6!
817
      }
818

819
      cJSON* isNullCJson = cJSON_CreateBool(isNull);
6✔
820
      RAW_NULL_CHECK(isNullCJson);
6!
821
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colValueNull", isNullCJson));
6!
822
      break;
6✔
823
    }
824
    case TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL: {
×
825
      int32_t nTags = taosArrayGetSize(vAlterTbReq.pMultiTag);
×
826
      if (nTags <= 0) {
×
827
        uError("processAlterTable parse multi tags error");
×
828
        goto end;
×
829
      }
830

831
      cJSON* tags = cJSON_CreateArray();
×
832
      RAW_NULL_CHECK(tags);
×
833
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags));
×
834

835
      for (int32_t i = 0; i < nTags; i++) {
×
836
        cJSON* member = cJSON_CreateObject();
×
837
        RAW_NULL_CHECK(member);
×
838
        RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, member));
×
839

840
        SMultiTagUpateVal* pTagVal = taosArrayGet(vAlterTbReq.pMultiTag, i);
×
841
        cJSON*             tagName = cJSON_CreateString(pTagVal->tagName);
×
842
        RAW_NULL_CHECK(tagName);
×
843
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colName", tagName));
×
844

845
        if (pTagVal->tagType == TSDB_DATA_TYPE_JSON) {
×
846
          uError("processAlterTable isJson false");
×
847
          goto end;
×
848
        }
849
        bool isNull = pTagVal->isNull;
×
850
        if (!isNull) {
×
851
          int64_t bufSize = 0;
×
852
          if (pTagVal->tagType == TSDB_DATA_TYPE_VARBINARY) {
×
853
            bufSize = pTagVal->nTagVal * 2 + 2 + 3;
×
854
          } else {
855
            bufSize = pTagVal->nTagVal + 3;
×
856
          }
857
          char* buf = taosMemoryCalloc(bufSize, 1);
×
858
          RAW_NULL_CHECK(buf);
×
859
          if (dataConverToStr(buf, bufSize, pTagVal->tagType, pTagVal->pTagVal, pTagVal->nTagVal, NULL) !=
×
860
              TSDB_CODE_SUCCESS) {
861
            taosMemoryFree(buf);
×
862
            goto end;
×
863
          }
864
          cJSON* colValue = cJSON_CreateString(buf);
×
865
          taosMemoryFree(buf);
×
866
          RAW_NULL_CHECK(colValue);
×
867
          RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colValue", colValue));
×
868
        }
869
        cJSON* isNullCJson = cJSON_CreateBool(isNull);
×
870
        RAW_NULL_CHECK(isNullCJson);
×
871
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colValueNull", isNullCJson));
×
872
      }
873
      break;
×
874
    }
875

876
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
×
877
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
×
878
      RAW_NULL_CHECK(colName);
×
879
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
880
      RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
×
881
      break;
×
882
    }
883
    default:
5✔
884
      break;
5✔
885
  }
886

887
end:
31✔
888
  uDebug("alter table return");
31!
889
  if (vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL) {
31!
890
    taosArrayDestroy(vAlterTbReq.pMultiTag);
×
891
  }
892
  tDecoderClear(&decoder);
31✔
893
  *pJson = json;
31✔
894
}
895

896
static void processDropSTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
8✔
897
  if (pJson == NULL || metaRsp == NULL) {
8!
898
    uError("invalid parameter in %s", __func__);
×
899
    return;
×
900
  }
901
  SDecoder     decoder = {0};
8✔
902
  SVDropStbReq req = {0};
8✔
903
  cJSON*       json = NULL;
8✔
904
  int32_t      code = 0;
8✔
905

906
  uDebug("processDropSTable data:%p", metaRsp);
8!
907

908
  // decode
909
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
8✔
910
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
8✔
911
  tDecoderInit(&decoder, data, len);
8✔
912
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
8!
913
    uError("tDecodeSVDropStbReq failed");
×
914
    goto end;
×
915
  }
916

917
  json = cJSON_CreateObject();
8✔
918
  RAW_NULL_CHECK(json);
8!
919
  cJSON* type = cJSON_CreateString("drop");
8✔
920
  RAW_NULL_CHECK(type);
8!
921
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
8!
922
  cJSON* tableType = cJSON_CreateString("super");
8✔
923
  RAW_NULL_CHECK(tableType);
8!
924
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
8!
925
  cJSON* tableName = cJSON_CreateString(req.name);
8✔
926
  RAW_NULL_CHECK(tableName);
8!
927
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
8!
928

929
end:
8✔
930
  uDebug("processDropSTable return");
8!
931
  tDecoderClear(&decoder);
8✔
932
  *pJson = json;
8✔
933
}
934
static void processDeleteTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
4✔
935
  if (pJson == NULL || metaRsp == NULL) {
4!
936
    uError("invalid parameter in %s", __func__);
×
937
    return;
×
938
  }
939
  SDeleteRes req = {0};
4✔
940
  SDecoder   coder = {0};
4✔
941
  cJSON*     json = NULL;
4✔
942
  int32_t    code = 0;
4✔
943

944
  uDebug("processDeleteTable data:%p", metaRsp);
4!
945
  // decode and process req
946
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
4✔
947
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
4✔
948

949
  tDecoderInit(&coder, data, len);
4✔
950
  if (tDecodeDeleteRes(&coder, &req) < 0) {
4!
951
    uError("tDecodeDeleteRes failed");
×
952
    goto end;
×
953
  }
954

955
  //  getTbName(req.tableFName);
956
  char sql[256] = {0};
4✔
957
  (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
4✔
958
                 req.tsColName, req.skey, req.tsColName, req.ekey);
959

960
  json = cJSON_CreateObject();
4✔
961
  RAW_NULL_CHECK(json);
4!
962
  cJSON* type = cJSON_CreateString("delete");
4✔
963
  RAW_NULL_CHECK(type);
4!
964
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
4!
965
  cJSON* sqlJson = cJSON_CreateString(sql);
4✔
966
  RAW_NULL_CHECK(sqlJson);
4!
967
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "sql", sqlJson));
4!
968

969
end:
4✔
970
  uDebug("processDeleteTable return");
4!
971
  tDecoderClear(&coder);
4✔
972
  *pJson = json;
4✔
973
}
974

975
static void processDropTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
5✔
976
  if (pJson == NULL || metaRsp == NULL) {
5!
977
    uError("invalid parameter in %s", __func__);
×
978
    return;
×
979
  }
980
  SDecoder         decoder = {0};
5✔
981
  SVDropTbBatchReq req = {0};
5✔
982
  cJSON*           json = NULL;
5✔
983
  int32_t          code = 0;
5✔
984

985
  uDebug("processDropTable data:%p", metaRsp);
5!
986
  // decode
987
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
5✔
988
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
5✔
989
  tDecoderInit(&decoder, data, len);
5✔
990
  if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) {
5!
991
    uError("tDecodeSVDropTbBatchReq failed");
×
992
    goto end;
×
993
  }
994

995
  json = cJSON_CreateObject();
5✔
996
  RAW_NULL_CHECK(json);
5!
997
  cJSON* type = cJSON_CreateString("drop");
5✔
998
  RAW_NULL_CHECK(type);
5!
999
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
5!
1000
  cJSON* tableNameList = cJSON_CreateArray();
5✔
1001
  RAW_NULL_CHECK(tableNameList);
5!
1002
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableNameList", tableNameList));
5!
1003

1004
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
11✔
1005
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
6✔
1006
    cJSON*       tableName = cJSON_CreateString(pDropTbReq->name);
6✔
1007
    RAW_NULL_CHECK(tableName);
6!
1008
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tableNameList, tableName));
6!
1009
  }
1010

1011
end:
5✔
1012
  uDebug("processDropTable return");
5!
1013
  tDecoderClear(&decoder);
5✔
1014
  *pJson = json;
5✔
1015
}
1016

1017
static int32_t taosCreateStb(TAOS* taos, void* meta, uint32_t metaLen) {
146✔
1018
  if (taos == NULL || meta == NULL) {
146!
1019
    uError("invalid parameter in %s", __func__);
×
1020
    return TSDB_CODE_INVALID_PARA;
×
1021
  }
1022
  SVCreateStbReq req = {0};
146✔
1023
  SDecoder       coder = {0};
146✔
1024
  SMCreateStbReq pReq = {0};
146✔
1025
  int32_t        code = TSDB_CODE_SUCCESS;
146✔
1026
  SRequestObj*   pRequest = NULL;
146✔
1027

1028
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
146!
1029
  uDebug(LOG_ID_TAG " create stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
146!
1030
  pRequest->syncQuery = true;
146✔
1031
  if (!pRequest->pDb) {
146!
1032
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1033
    goto end;
×
1034
  }
1035
  // decode and process req
1036
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
146✔
1037
  uint32_t len = metaLen - sizeof(SMsgHead);
146✔
1038
  tDecoderInit(&coder, data, len);
146✔
1039
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
146!
1040
    code = TSDB_CODE_INVALID_PARA;
×
1041
    goto end;
×
1042
  }
1043

1044
  int8_t           createDefaultCompress = 0;
146✔
1045
  SColCmprWrapper* p = &req.colCmpr;
146✔
1046
  if (p->nCols == 0) {
146!
1047
    createDefaultCompress = 1;
×
1048
  }
1049
  // build create stable
1050
  pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SFieldWithOptions));
146✔
1051
  RAW_NULL_CHECK(pReq.pColumns);
146!
1052
  for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
890✔
1053
    SSchema*          pSchema = req.schemaRow.pSchema + i;
744✔
1054
    SFieldWithOptions field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
744✔
1055
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
744✔
1056

1057
    if (createDefaultCompress) {
744!
1058
      field.compress = createDefaultColCmprByType(pSchema->type);
×
1059
    } else {
1060
      SColCmpr* pCmp = &req.colCmpr.pColCmpr[i];
744✔
1061
      field.compress = pCmp->alg;
744✔
1062
    }
1063
    RAW_NULL_CHECK(taosArrayPush(pReq.pColumns, &field));
1,488!
1064
  }
1065
  pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
146✔
1066
  RAW_NULL_CHECK(pReq.pTags);
146!
1067
  for (int32_t i = 0; i < req.schemaTag.nCols; i++) {
549✔
1068
    SSchema* pSchema = req.schemaTag.pSchema + i;
403✔
1069
    SField   field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
403✔
1070
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
403✔
1071
    RAW_NULL_CHECK(taosArrayPush(pReq.pTags, &field));
806!
1072
  }
1073

1074
  pReq.colVer = req.schemaRow.version;
146✔
1075
  pReq.tagVer = req.schemaTag.version;
146✔
1076
  pReq.numOfColumns = req.schemaRow.nCols;
146✔
1077
  pReq.numOfTags = req.schemaTag.nCols;
146✔
1078
  pReq.commentLen = -1;
146✔
1079
  pReq.suid = processSuid(req.suid, pRequest->pDb);
146✔
1080
  pReq.source = TD_REQ_FROM_TAOX;
146✔
1081
  pReq.igExists = true;
146✔
1082

1083
  uDebug(LOG_ID_TAG " create stable name:%s suid:%" PRId64 " processSuid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
146!
1084
         pReq.suid);
1085
  STscObj* pTscObj = pRequest->pTscObj;
146✔
1086
  SName    tableName = {0};
146✔
1087
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
146✔
1088
  RAW_RETURN_CHECK(tNameExtractFullName(&tableName, pReq.name));
146!
1089
  SCmdMsgInfo pCmdMsg = {0};
146✔
1090
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
146✔
1091
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
146✔
1092
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
146✔
1093
  if (pCmdMsg.msgLen <= 0) {
146!
1094
    code = TSDB_CODE_INVALID_PARA;
×
1095
    goto end;
×
1096
  }
1097
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
146!
1098
  RAW_NULL_CHECK(pCmdMsg.pMsg);
146!
1099
  if (tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) <= 0) {
146!
1100
    code = TSDB_CODE_INVALID_PARA;
×
1101
    taosMemoryFree(pCmdMsg.pMsg);
×
1102
    goto end;
×
1103
  }
1104

1105
  SQuery pQuery = {0};
146✔
1106
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
146✔
1107
  pQuery.pCmdMsg = &pCmdMsg;
146✔
1108
  pQuery.msgType = pQuery.pCmdMsg->msgType;
146✔
1109
  pQuery.stableQuery = true;
146✔
1110

1111
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
146✔
1112

1113
  taosMemoryFree(pCmdMsg.pMsg);
146!
1114

1115
  if (pRequest->code == TSDB_CODE_SUCCESS) {
146!
1116
    SCatalog* pCatalog = NULL;
146✔
1117
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
146!
1118
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
146!
1119
  }
1120

1121
  code = pRequest->code;
146✔
1122

1123
end:
146✔
1124
  uDebug(LOG_ID_TAG " create stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
146!
1125
  destroyRequest(pRequest);
146✔
1126
  tFreeSMCreateStbReq(&pReq);
146✔
1127
  tDecoderClear(&coder);
146✔
1128
  return code;
146✔
1129
}
1130

1131
static int32_t taosDropStb(TAOS* taos, void* meta, uint32_t metaLen) {
8✔
1132
  if (taos == NULL || meta == NULL) {
8!
1133
    uError("invalid parameter in %s", __func__);
×
1134
    return TSDB_CODE_INVALID_PARA;
×
1135
  }
1136
  SVDropStbReq req = {0};
8✔
1137
  SDecoder     coder = {0};
8✔
1138
  SMDropStbReq pReq = {0};
8✔
1139
  int32_t      code = TSDB_CODE_SUCCESS;
8✔
1140
  SRequestObj* pRequest = NULL;
8✔
1141

1142
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
8!
1143
  uDebug(LOG_ID_TAG " drop stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
8!
1144
  pRequest->syncQuery = true;
8✔
1145
  if (!pRequest->pDb) {
8!
1146
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1147
    goto end;
×
1148
  }
1149
  // decode and process req
1150
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
8✔
1151
  uint32_t len = metaLen - sizeof(SMsgHead);
8✔
1152
  tDecoderInit(&coder, data, len);
8✔
1153
  if (tDecodeSVDropStbReq(&coder, &req) < 0) {
8!
1154
    code = TSDB_CODE_INVALID_PARA;
×
1155
    goto end;
×
1156
  }
1157

1158
  SCatalog* pCatalog = NULL;
8✔
1159
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
8!
1160
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
8✔
1161
                           .requestId = pRequest->requestId,
8✔
1162
                           .requestObjRefId = pRequest->self,
8✔
1163
                           .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
8✔
1164
  SName            pName = {0};
8✔
1165
  toName(pRequest->pTscObj->acctId, pRequest->pDb, req.name, &pName);
8✔
1166
  STableMeta* pTableMeta = NULL;
8✔
1167
  code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
8✔
1168
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
8✔
1169
    code = TSDB_CODE_SUCCESS;
2✔
1170
    taosMemoryFreeClear(pTableMeta);
2!
1171
    goto end;
2✔
1172
  }
1173
  if (code != TSDB_CODE_SUCCESS) {
6!
1174
    goto end;
×
1175
  }
1176
  pReq.suid = pTableMeta->uid;
6✔
1177
  taosMemoryFreeClear(pTableMeta);
6!
1178

1179
  // build drop stable
1180
  pReq.igNotExists = true;
6✔
1181
  pReq.source = TD_REQ_FROM_TAOX;
6✔
1182
  //  pReq.suid = processSuid(req.suid, pRequest->pDb);
1183

1184
  uDebug(LOG_ID_TAG " drop stable name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
6!
1185
         pReq.suid);
1186
  STscObj* pTscObj = pRequest->pTscObj;
6✔
1187
  SName    tableName = {0};
6✔
1188
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
6✔
1189
  if (tNameExtractFullName(&tableName, pReq.name) != 0) {
6!
1190
    code = TSDB_CODE_INVALID_PARA;
×
1191
    goto end;
×
1192
  }
1193

1194
  SCmdMsgInfo pCmdMsg = {0};
6✔
1195
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
6✔
1196
  pCmdMsg.msgType = TDMT_MND_DROP_STB;
6✔
1197
  pCmdMsg.msgLen = tSerializeSMDropStbReq(NULL, 0, &pReq);
6✔
1198
  if (pCmdMsg.msgLen <= 0) {
6!
1199
    code = TSDB_CODE_INVALID_PARA;
×
1200
    goto end;
×
1201
  }
1202
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
6!
1203
  RAW_NULL_CHECK(pCmdMsg.pMsg);
6!
1204
  if (tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) <= 0) {
6!
1205
    code = TSDB_CODE_INVALID_PARA;
×
1206
    taosMemoryFree(pCmdMsg.pMsg);
×
1207
    goto end;
×
1208
  }
1209

1210
  SQuery pQuery = {0};
6✔
1211
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
6✔
1212
  pQuery.pCmdMsg = &pCmdMsg;
6✔
1213
  pQuery.msgType = pQuery.pCmdMsg->msgType;
6✔
1214
  pQuery.stableQuery = true;
6✔
1215

1216
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
6✔
1217
  taosMemoryFree(pCmdMsg.pMsg);
6!
1218
  if (pRequest->code == TSDB_CODE_SUCCESS) {
6!
1219
    // ignore the error code
1220
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
6!
1221
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
6!
1222
  }
1223

1224
  code = pRequest->code;
6✔
1225

1226
end:
8✔
1227
  uDebug(LOG_ID_TAG " drop stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
8!
1228
  destroyRequest(pRequest);
8✔
1229
  tDecoderClear(&coder);
8✔
1230
  return code;
8✔
1231
}
1232

1233
typedef struct SVgroupCreateTableBatch {
1234
  SVCreateTbBatchReq req;
1235
  SVgroupInfo        info;
1236
  char               dbName[TSDB_DB_NAME_LEN];
1237
} SVgroupCreateTableBatch;
1238

1239
static void destroyCreateTbReqBatch(void* data) {
144✔
1240
  if (data == NULL) {
144!
1241
    uError("invalid parameter in %s", __func__);
×
1242
    return;
×
1243
  }
1244
  SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data;
144✔
1245
  taosArrayDestroy(pTbBatch->req.pArray);
144✔
1246
}
1247

1248
static int32_t taosCreateTable(TAOS* taos, void* meta, uint32_t metaLen) {
141✔
1249
  if (taos == NULL || meta == NULL) {
141!
1250
    uError("invalid parameter in %s", __func__);
×
1251
    return TSDB_CODE_INVALID_PARA;
×
1252
  }
1253
  SVCreateTbBatchReq req = {0};
141✔
1254
  SDecoder           coder = {0};
141✔
1255
  int32_t            code = TSDB_CODE_SUCCESS;
141✔
1256
  SRequestObj*       pRequest = NULL;
141✔
1257
  SQuery*            pQuery = NULL;
141✔
1258
  SHashObj*          pVgroupHashmap = NULL;
141✔
1259
  SArray*            pTagList = taosArrayInit(0, POINTER_BYTES);
141✔
1260
  RAW_NULL_CHECK(pTagList);
141!
1261
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
141!
1262
  uDebug(LOG_ID_TAG " create table, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
141!
1263

1264
  pRequest->syncQuery = true;
141✔
1265
  if (!pRequest->pDb) {
141!
1266
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1267
    goto end;
×
1268
  }
1269
  // decode and process req
1270
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
141✔
1271
  uint32_t len = metaLen - sizeof(SMsgHead);
141✔
1272
  tDecoderInit(&coder, data, len);
141✔
1273
  if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
141!
1274
    code = TSDB_CODE_INVALID_PARA;
×
1275
    goto end;
×
1276
  }
1277

1278
  STscObj* pTscObj = pRequest->pTscObj;
141✔
1279

1280
  SVCreateTbReq* pCreateReq = NULL;
141✔
1281
  SCatalog*      pCatalog = NULL;
141✔
1282
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
141!
1283
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
141✔
1284
  RAW_NULL_CHECK(pVgroupHashmap);
141!
1285
  taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
141✔
1286

1287
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
141✔
1288
                           .requestId = pRequest->requestId,
141✔
1289
                           .requestObjRefId = pRequest->self,
141✔
1290
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
141✔
1291

1292
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
141✔
1293
  RAW_NULL_CHECK(pRequest->tableList);
141!
1294
  // loop to create table
1295
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
294✔
1296
    pCreateReq = req.pReqs + iReq;
153✔
1297

1298
    SVgroupInfo pInfo = {0};
153✔
1299
    SName       pName = {0};
153✔
1300
    toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
153✔
1301
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
153✔
1302
    if (code != TSDB_CODE_SUCCESS) {
153!
1303
      goto end;
×
1304
    }
1305

1306
    pCreateReq->flags |= TD_CREATE_IF_NOT_EXISTS;
153✔
1307
    // change tag cid to new cid
1308
    if (pCreateReq->type == TSDB_CHILD_TABLE) {
153✔
1309
      STableMeta* pTableMeta = NULL;
133✔
1310
      SName       sName = {0};
133✔
1311
      tb_uid_t    oldSuid = pCreateReq->ctb.suid;
133✔
1312
      //      pCreateReq->ctb.suid = processSuid(pCreateReq->ctb.suid, pRequest->pDb);
1313
      toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName);
133✔
1314
      code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta);
133✔
1315
      if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
133!
1316
        code = TSDB_CODE_SUCCESS;
×
1317
        taosMemoryFreeClear(pTableMeta);
×
1318
        continue;
×
1319
      }
1320

1321
      if (code != TSDB_CODE_SUCCESS) {
133!
1322
        goto end;
×
1323
      }
1324
      pCreateReq->ctb.suid = pTableMeta->uid;
133✔
1325

1326
      SArray* pTagVals = NULL;
133✔
1327
      code = tTagToValArray((STag*)pCreateReq->ctb.pTag, &pTagVals);
133✔
1328
      if (code != TSDB_CODE_SUCCESS) {
133!
1329
        taosMemoryFreeClear(pTableMeta);
×
1330
        goto end;
×
1331
      }
1332

1333
      bool rebuildTag = false;
133✔
1334
      for (int32_t i = 0; i < taosArrayGetSize(pCreateReq->ctb.tagName); i++) {
406✔
1335
        char* tName = taosArrayGet(pCreateReq->ctb.tagName, i);
273✔
1336
        if (tName == NULL) {
273!
1337
          continue;
×
1338
        }
1339
        for (int32_t j = pTableMeta->tableInfo.numOfColumns;
273✔
1340
             j < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; j++) {
1,074✔
1341
          SSchema* tag = &pTableMeta->schema[j];
801✔
1342
          if (strcmp(tag->name, tName) == 0 && tag->type != TSDB_DATA_TYPE_JSON) {
801✔
1343
            STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
257✔
1344
            if (pTagVal) {
257!
1345
              if (pTagVal->cid != tag->colId) {
257✔
1346
                pTagVal->cid = tag->colId;
21✔
1347
                rebuildTag = true;
21✔
1348
              }
1349
            } else {
1350
              uError("create tb invalid data %s, size:%d index:%d cid:%d", pCreateReq->name,
×
1351
                     (int)taosArrayGetSize(pTagVals), i, tag->colId);
1352
            }
1353
          }
1354
        }
1355
      }
1356
      taosMemoryFreeClear(pTableMeta);
133!
1357
      if (rebuildTag) {
133✔
1358
        STag* ppTag = NULL;
13✔
1359
        code = tTagNew(pTagVals, 1, false, &ppTag);
13✔
1360
        taosArrayDestroy(pTagVals);
13✔
1361
        pTagVals = NULL;
13✔
1362
        if (code != TSDB_CODE_SUCCESS) {
13!
1363
          goto end;
×
1364
        }
1365
        if (NULL == taosArrayPush(pTagList, &ppTag)) {
13!
1366
          tTagFree(ppTag);
×
1367
          goto end;
×
1368
        }
1369
        pCreateReq->ctb.pTag = (uint8_t*)ppTag;
13✔
1370
      }
1371
      taosArrayDestroy(pTagVals);
133✔
1372
    }
1373
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
306!
1374

1375
    SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
153✔
1376
    if (pTableBatch == NULL) {
153✔
1377
      SVgroupCreateTableBatch tBatch = {0};
144✔
1378
      tBatch.info = pInfo;
144✔
1379
      tstrncpy(tBatch.dbName, pRequest->pDb, TSDB_DB_NAME_LEN);
144✔
1380

1381
      tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
144✔
1382
      RAW_NULL_CHECK(tBatch.req.pArray);
144!
1383
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pCreateReq));
288!
1384
      tBatch.req.source = TD_REQ_FROM_TAOX;
144✔
1385
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
144!
1386
    } else {  // add to the correct vgroup
1387
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pCreateReq));
18!
1388
    }
1389
  }
1390

1391
  if (taosHashGetSize(pVgroupHashmap) == 0) {
141!
1392
    goto end;
×
1393
  }
1394
  SArray* pBufArray = NULL;
141✔
1395
  RAW_RETURN_CHECK(serializeVgroupsCreateTableBatch(pVgroupHashmap, &pBufArray));
141!
1396
  pQuery = NULL;
141✔
1397
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
141✔
1398
  if (TSDB_CODE_SUCCESS != code) goto end;
141!
1399
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
141✔
1400
  pQuery->msgType = TDMT_VND_CREATE_TABLE;
141✔
1401
  pQuery->stableQuery = false;
141✔
1402
  code = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT, &pQuery->pRoot);
141✔
1403
  if (TSDB_CODE_SUCCESS != code) goto end;
141!
1404
  RAW_NULL_CHECK(pQuery->pRoot);
141!
1405

1406
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
141!
1407

1408
  launchQueryImpl(pRequest, pQuery, true, NULL);
141✔
1409
  if (pRequest->code == TSDB_CODE_SUCCESS) {
141!
1410
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
141!
1411
  }
1412

1413
  code = pRequest->code;
141✔
1414

1415
end:
141✔
1416
  uDebug(LOG_ID_TAG " create table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
141!
1417
  tDeleteSVCreateTbBatchReq(&req);
141✔
1418

1419
  taosHashCleanup(pVgroupHashmap);
141✔
1420
  destroyRequest(pRequest);
141✔
1421
  tDecoderClear(&coder);
141✔
1422
  qDestroyQuery(pQuery);
141✔
1423
  taosArrayDestroyP(pTagList, NULL);
141✔
1424
  return code;
141✔
1425
}
1426

1427
typedef struct SVgroupDropTableBatch {
1428
  SVDropTbBatchReq req;
1429
  SVgroupInfo      info;
1430
  char             dbName[TSDB_DB_NAME_LEN];
1431
} SVgroupDropTableBatch;
1432

1433
static void destroyDropTbReqBatch(void* data) {
3✔
1434
  if (data == NULL) {
3!
1435
    uError("invalid parameter in %s", __func__);
×
1436
    return;
×
1437
  }
1438
  SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
3✔
1439
  taosArrayDestroy(pTbBatch->req.pArray);
3✔
1440
}
1441

1442
static int32_t taosDropTable(TAOS* taos, void* meta, uint32_t metaLen) {
5✔
1443
  if (taos == NULL || meta == NULL) {
5!
1444
    uError("invalid parameter in %s", __func__);
×
1445
    return TSDB_CODE_INVALID_PARA;
×
1446
  }
1447
  SVDropTbBatchReq req = {0};
5✔
1448
  SDecoder         coder = {0};
5✔
1449
  int32_t          code = TSDB_CODE_SUCCESS;
5✔
1450
  SRequestObj*     pRequest = NULL;
5✔
1451
  SQuery*          pQuery = NULL;
5✔
1452
  SHashObj*        pVgroupHashmap = NULL;
5✔
1453

1454
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
5!
1455
  uDebug(LOG_ID_TAG " drop table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
5!
1456

1457
  pRequest->syncQuery = true;
5✔
1458
  if (!pRequest->pDb) {
5!
1459
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1460
    goto end;
×
1461
  }
1462
  // decode and process req
1463
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
5✔
1464
  uint32_t len = metaLen - sizeof(SMsgHead);
5✔
1465
  tDecoderInit(&coder, data, len);
5✔
1466
  if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
5!
1467
    code = TSDB_CODE_INVALID_PARA;
×
1468
    goto end;
×
1469
  }
1470

1471
  STscObj* pTscObj = pRequest->pTscObj;
5✔
1472

1473
  SVDropTbReq* pDropReq = NULL;
5✔
1474
  SCatalog*    pCatalog = NULL;
5✔
1475
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
5!
1476

1477
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
5✔
1478
  RAW_NULL_CHECK(pVgroupHashmap);
5!
1479
  taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
5✔
1480

1481
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
5✔
1482
                           .requestId = pRequest->requestId,
5✔
1483
                           .requestObjRefId = pRequest->self,
5✔
1484
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
5✔
1485
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
5✔
1486
  RAW_NULL_CHECK(pRequest->tableList);
5!
1487
  // loop to create table
1488
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
11✔
1489
    pDropReq = req.pReqs + iReq;
6✔
1490
    pDropReq->igNotExists = true;
6✔
1491
    //    pDropReq->suid = processSuid(pDropReq->suid, pRequest->pDb);
1492

1493
    SVgroupInfo pInfo = {0};
6✔
1494
    SName       pName = {0};
6✔
1495
    toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
6✔
1496
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
6!
1497

1498
    STableMeta* pTableMeta = NULL;
6✔
1499
    code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
6✔
1500
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
6✔
1501
      code = TSDB_CODE_SUCCESS;
2✔
1502
      taosMemoryFreeClear(pTableMeta);
2!
1503
      continue;
2✔
1504
    }
1505
    if (code != TSDB_CODE_SUCCESS) {
4!
1506
      goto end;
×
1507
    }
1508
    tb_uid_t oldSuid = pDropReq->suid;
4✔
1509
    pDropReq->suid = pTableMeta->suid;
4✔
1510
    taosMemoryFreeClear(pTableMeta);
4!
1511
    uDebug(LOG_ID_TAG " drop table name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, pDropReq->name, oldSuid,
4!
1512
           pDropReq->suid);
1513

1514
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
8!
1515
    SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
4✔
1516
    if (pTableBatch == NULL) {
4✔
1517
      SVgroupDropTableBatch tBatch = {0};
3✔
1518
      tBatch.info = pInfo;
3✔
1519
      tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
3✔
1520
      RAW_NULL_CHECK(tBatch.req.pArray);
3!
1521
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pDropReq));
6!
1522
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
3!
1523
    } else {  // add to the correct vgroup
1524
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pDropReq));
2!
1525
    }
1526
  }
1527

1528
  if (taosHashGetSize(pVgroupHashmap) == 0) {
5✔
1529
    goto end;
2✔
1530
  }
1531
  SArray* pBufArray = NULL;
3✔
1532
  RAW_RETURN_CHECK(serializeVgroupsDropTableBatch(pVgroupHashmap, &pBufArray));
3!
1533
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
3✔
1534
  if (TSDB_CODE_SUCCESS != code) goto end;
3!
1535
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
3✔
1536
  pQuery->msgType = TDMT_VND_DROP_TABLE;
3✔
1537
  pQuery->stableQuery = false;
3✔
1538
  pQuery->pRoot = NULL;
3✔
1539
  code = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT, &pQuery->pRoot);
3✔
1540
  if (TSDB_CODE_SUCCESS != code) goto end;
3!
1541
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
3!
1542

1543
  launchQueryImpl(pRequest, pQuery, true, NULL);
3✔
1544
  if (pRequest->code == TSDB_CODE_SUCCESS) {
3!
1545
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
3!
1546
  }
1547
  code = pRequest->code;
3✔
1548

1549
end:
5✔
1550
  uDebug(LOG_ID_TAG " drop table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
5!
1551
  taosHashCleanup(pVgroupHashmap);
5✔
1552
  destroyRequest(pRequest);
5✔
1553
  tDecoderClear(&coder);
5✔
1554
  qDestroyQuery(pQuery);
5✔
1555
  return code;
5✔
1556
}
1557

1558
static int32_t taosDeleteData(TAOS* taos, void* meta, uint32_t metaLen) {
4✔
1559
  if (taos == NULL || meta == NULL) {
4!
1560
    uError("invalid parameter in %s", __func__);
×
1561
    return TSDB_CODE_INVALID_PARA;
×
1562
  }
1563
  SDeleteRes req = {0};
4✔
1564
  SDecoder   coder = {0};
4✔
1565
  char       sql[256] = {0};
4✔
1566
  int32_t    code = TSDB_CODE_SUCCESS;
4✔
1567

1568
  uDebug("connId:0x%" PRIx64 " delete data, meta:%p, len:%d", *(int64_t*)taos, meta, metaLen);
4!
1569

1570
  // decode and process req
1571
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
4✔
1572
  uint32_t len = metaLen - sizeof(SMsgHead);
4✔
1573
  tDecoderInit(&coder, data, len);
4✔
1574
  if (tDecodeDeleteRes(&coder, &req) < 0) {
4!
1575
    code = TSDB_CODE_INVALID_PARA;
×
1576
    goto end;
×
1577
  }
1578

1579
  (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
4✔
1580
                 req.tsColName, req.skey, req.tsColName, req.ekey);
1581

1582
  TAOS_RES* res = taosQueryImpl(taos, sql, false, TD_REQ_FROM_TAOX);
4✔
1583
  RAW_NULL_CHECK(res);
4!
1584
  SRequestObj* pRequest = (SRequestObj*)res;
4✔
1585
  code = pRequest->code;
4✔
1586
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_PAR_GET_META_ERROR) {
4!
1587
    code = TSDB_CODE_SUCCESS;
1✔
1588
  }
1589
  taos_free_result(res);
4✔
1590

1591
end:
4✔
1592
  uDebug("connId:0x%" PRIx64 " delete data sql:%s, code:%s", *(int64_t*)taos, sql, tstrerror(code));
4!
1593
  tDecoderClear(&coder);
4✔
1594
  return code;
4✔
1595
}
1596

1597
static int32_t taosAlterTable(TAOS* taos, void* meta, uint32_t metaLen) {
31✔
1598
  if (taos == NULL || meta == NULL) {
31!
1599
    uError("invalid parameter in %s", __func__);
×
1600
    return TSDB_CODE_INVALID_PARA;
×
1601
  }
1602
  SVAlterTbReq   req = {0};
31✔
1603
  SDecoder       dcoder = {0};
31✔
1604
  int32_t        code = TSDB_CODE_SUCCESS;
31✔
1605
  SRequestObj*   pRequest = NULL;
31✔
1606
  SQuery*        pQuery = NULL;
31✔
1607
  SArray*        pArray = NULL;
31✔
1608
  SVgDataBlocks* pVgData = NULL;
31✔
1609

1610
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
31!
1611
  uDebug(LOG_ID_TAG " alter table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
31!
1612
  pRequest->syncQuery = true;
31✔
1613
  if (!pRequest->pDb) {
31!
1614
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1615
    goto end;
×
1616
  }
1617
  // decode and process req
1618
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
31✔
1619
  uint32_t len = metaLen - sizeof(SMsgHead);
31✔
1620
  tDecoderInit(&dcoder, data, len);
31✔
1621
  if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) {
31!
1622
    code = TSDB_CODE_INVALID_PARA;
×
1623
    goto end;
×
1624
  }
1625

1626
  // do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
1627
  if (req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS) {
31✔
1628
    goto end;
5✔
1629
  }
1630

1631
  STscObj*  pTscObj = pRequest->pTscObj;
26✔
1632
  SCatalog* pCatalog = NULL;
26✔
1633
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
26!
1634
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
26✔
1635
                           .requestId = pRequest->requestId,
26✔
1636
                           .requestObjRefId = pRequest->self,
26✔
1637
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
26✔
1638

1639
  SVgroupInfo pInfo = {0};
26✔
1640
  SName       pName = {0};
26✔
1641
  toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
26✔
1642
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
26!
1643
  pArray = taosArrayInit(1, sizeof(void*));
26✔
1644
  RAW_NULL_CHECK(pArray);
26!
1645

1646
  pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
26!
1647
  RAW_NULL_CHECK(pVgData);
26!
1648
  pVgData->vg = pInfo;
26✔
1649

1650
  int tlen = 0;
26✔
1651
  req.source = TD_REQ_FROM_TAOX;
26✔
1652
  tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code);
26!
1653
  if (code != 0) {
26!
1654
    code = terrno;
×
1655
    goto end;
×
1656
  }
1657
  tlen += sizeof(SMsgHead);
26✔
1658
  void* pMsg = taosMemoryMalloc(tlen);
26!
1659
  RAW_NULL_CHECK(pMsg);
26!
1660
  ((SMsgHead*)pMsg)->vgId = htonl(pInfo.vgId);
26✔
1661
  ((SMsgHead*)pMsg)->contLen = htonl(tlen);
26✔
1662
  void*    pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
26✔
1663
  SEncoder coder = {0};
26✔
1664
  tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
26✔
1665
  code = tEncodeSVAlterTbReq(&coder, &req);
26✔
1666
  if (code != 0) {
26!
1667
    tEncoderClear(&coder);
×
1668
    code = terrno;
×
1669
    goto end;
×
1670
  }
1671
  tEncoderClear(&coder);
26✔
1672

1673
  pVgData->pData = pMsg;
26✔
1674
  pVgData->size = tlen;
26✔
1675

1676
  pVgData->numOfTables = 1;
26✔
1677
  RAW_NULL_CHECK(taosArrayPush(pArray, &pVgData));
26!
1678

1679
  pQuery = NULL;
26✔
1680
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
26✔
1681
  if (NULL == pQuery) goto end;
26!
1682
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
26✔
1683
  pQuery->msgType = TDMT_VND_ALTER_TABLE;
26✔
1684
  pQuery->stableQuery = false;
26✔
1685
  pQuery->pRoot = NULL;
26✔
1686
  code = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT, &pQuery->pRoot);
26✔
1687
  if (TSDB_CODE_SUCCESS != code) goto end;
26!
1688
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pArray));
26!
1689

1690
  launchQueryImpl(pRequest, pQuery, true, NULL);
26✔
1691

1692
  pVgData = NULL;
26✔
1693
  pArray = NULL;
26✔
1694
  code = pRequest->code;
26✔
1695
  if (code == TSDB_CODE_TDB_TABLE_NOT_EXIST) {
26✔
1696
    code = TSDB_CODE_SUCCESS;
1✔
1697
  }
1698

1699
  if (pRequest->code == TSDB_CODE_SUCCESS) {
26✔
1700
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
25✔
1701
    if (pRes->res != NULL) {
25✔
1702
      code = handleAlterTbExecRes(pRes->res, pCatalog);
20✔
1703
    }
1704
  }
1705
end:
6✔
1706
  uDebug(LOG_ID_TAG " alter table return, meta:%p, len:%d, msg:%s", LOG_ID_VALUE, meta, metaLen, tstrerror(code));
31!
1707
  taosArrayDestroy(pArray);
31✔
1708
  if (pVgData) taosMemoryFreeClear(pVgData->pData);
31!
1709
  taosMemoryFreeClear(pVgData);
31!
1710
  destroyRequest(pRequest);
31✔
1711
  tDecoderClear(&dcoder);
31✔
1712
  qDestroyQuery(pQuery);
31✔
1713
  return code;
31✔
1714
}
1715

1716
int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD* fields,
1✔
1717
                                     int numFields) {
1718
  return taos_write_raw_block_with_fields_with_reqid(taos, rows, pData, tbname, fields, numFields, 0);
1✔
1719
}
1720

1721
int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname,
1✔
1722
                                                TAOS_FIELD* fields, int numFields, int64_t reqid) {
1723
  if (taos == NULL || pData == NULL || tbname == NULL) {
1!
1724
    uError("invalid parameter in %s", __func__);
×
1725
    return TSDB_CODE_INVALID_PARA;
×
1726
  }
1727
  int32_t     code = TSDB_CODE_SUCCESS;
1✔
1728
  STableMeta* pTableMeta = NULL;
1✔
1729
  SQuery*     pQuery = NULL;
1✔
1730
  SHashObj*   pVgHash = NULL;
1✔
1731

1732
  SRequestObj* pRequest = NULL;
1✔
1733
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
1!
1734

1735
  uDebug(LOG_ID_TAG " write raw block with field, rows:%d, pData:%p, tbname:%s, fields:%p, numFields:%d", LOG_ID_VALUE,
1!
1736
         rows, pData, tbname, fields, numFields);
1737

1738
  pRequest->syncQuery = true;
1✔
1739
  if (!pRequest->pDb) {
1!
1740
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1741
    goto end;
×
1742
  }
1743

1744
  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
1✔
1745
  tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
1✔
1746
  tstrncpy(pName.tname, tbname, sizeof(pName.tname));
1✔
1747

1748
  struct SCatalog* pCatalog = NULL;
1✔
1749
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
1!
1750

1751
  SRequestConnInfo conn = {0};
1✔
1752
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
1✔
1753
  conn.requestId = pRequest->requestId;
1✔
1754
  conn.requestObjRefId = pRequest->self;
1✔
1755
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
1✔
1756

1757
  SVgroupInfo vgData = {0};
1✔
1758
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData));
1!
1759
  RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
1!
1760
  RAW_RETURN_CHECK(smlInitHandle(&pQuery));
1!
1761
  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
1✔
1762
  RAW_NULL_CHECK(pVgHash);
1!
1763
  RAW_RETURN_CHECK(
1!
1764
      taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
1765
  RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0, false));
1!
1766
  RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
1!
1767

1768
  launchQueryImpl(pRequest, pQuery, true, NULL);
1✔
1769
  code = pRequest->code;
1✔
1770

1771
end:
1✔
1772
  uDebug(LOG_ID_TAG " write raw block with field return, msg:%s", LOG_ID_VALUE, tstrerror(code));
1!
1773
  taosMemoryFreeClear(pTableMeta);
1!
1774
  qDestroyQuery(pQuery);
1✔
1775
  destroyRequest(pRequest);
1✔
1776
  taosHashCleanup(pVgHash);
1✔
1777
  return code;
1✔
1778
}
1779

1780
int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) {
7✔
1781
  return taos_write_raw_block_with_reqid(taos, rows, pData, tbname, 0);
7✔
1782
}
1783

1784
int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname, int64_t reqid) {
7✔
1785
  if (taos == NULL || pData == NULL || tbname == NULL) {
7!
1786
    return TSDB_CODE_INVALID_PARA;
×
1787
  }
1788
  int32_t     code = TSDB_CODE_SUCCESS;
7✔
1789
  STableMeta* pTableMeta = NULL;
7✔
1790
  SQuery*     pQuery = NULL;
7✔
1791
  SHashObj*   pVgHash = NULL;
7✔
1792

1793
  SRequestObj* pRequest = NULL;
7✔
1794
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
7!
1795

1796
  uDebug(LOG_ID_TAG " write raw block, rows:%d, pData:%p, tbname:%s", LOG_ID_VALUE, rows, pData, tbname);
7!
1797

1798
  pRequest->syncQuery = true;
7✔
1799
  if (!pRequest->pDb) {
7!
1800
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1801
    goto end;
×
1802
  }
1803

1804
  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
7✔
1805
  tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
7✔
1806
  tstrncpy(pName.tname, tbname, sizeof(pName.tname));
7✔
1807

1808
  struct SCatalog* pCatalog = NULL;
7✔
1809
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
7!
1810

1811
  SRequestConnInfo conn = {0};
7✔
1812
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
7✔
1813
  conn.requestId = pRequest->requestId;
7✔
1814
  conn.requestObjRefId = pRequest->self;
7✔
1815
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
7✔
1816

1817
  SVgroupInfo vgData = {0};
7✔
1818
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData));
7!
1819
  RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
7✔
1820
  RAW_RETURN_CHECK(smlInitHandle(&pQuery));
6!
1821
  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
6✔
1822
  RAW_NULL_CHECK(pVgHash);
6!
1823
  RAW_RETURN_CHECK(
6!
1824
      taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
1825
  RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false, NULL, 0, false));
6✔
1826
  RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
4!
1827

1828
  launchQueryImpl(pRequest, pQuery, true, NULL);
4✔
1829
  code = pRequest->code;
4✔
1830

1831
end:
7✔
1832
  uDebug(LOG_ID_TAG " write raw block return, msg:%s", LOG_ID_VALUE, tstrerror(code));
7!
1833
  taosMemoryFreeClear(pTableMeta);
7!
1834
  qDestroyQuery(pQuery);
7✔
1835
  destroyRequest(pRequest);
7✔
1836
  taosHashCleanup(pVgHash);
7✔
1837
  return code;
7✔
1838
}
1839

1840
static void* getRawDataFromRes(void* pRetrieve) {
163✔
1841
  if (pRetrieve == NULL) {
163!
1842
    uError("invalid parameter in %s", __func__);
×
1843
    return NULL;
×
1844
  }
1845
  void* rawData = NULL;
163✔
1846
  // deal with compatibility
1847
  if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_VERSION) {
163!
1848
    rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
×
1849
  } else if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_VERSION ||
163✔
1850
             *(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION) {
18!
1851
    rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
163✔
1852
  }
1853
  return rawData;
163✔
1854
}
1855

1856
static int32_t buildCreateTbMap(SMqDataRsp* rsp, SHashObj* pHashObj) {
16✔
1857
  if (rsp == NULL || pHashObj == NULL) {
16!
1858
    uError("invalid parameter in %s", __func__);
×
1859
    return TSDB_CODE_INVALID_PARA;
×
1860
  }
1861
  // find schema data info
1862
  int32_t       code = 0;
16✔
1863
  SVCreateTbReq pCreateReq = {0};
16✔
1864
  SDecoder      decoderTmp = {0};
16✔
1865

1866
  for (int j = 0; j < rsp->createTableNum; j++) {
48✔
1867
    void** dataTmp = taosArrayGet(rsp->createTableReq, j);
32✔
1868
    RAW_NULL_CHECK(dataTmp);
32!
1869
    int32_t* lenTmp = taosArrayGet(rsp->createTableLen, j);
32✔
1870
    RAW_NULL_CHECK(lenTmp);
32!
1871

1872
    tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
32✔
1873
    RAW_RETURN_CHECK(tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq));
32!
1874

1875
    if (pCreateReq.type != TSDB_CHILD_TABLE) {
32!
1876
      code = TSDB_CODE_INVALID_MSG;
×
1877
      goto end;
×
1878
    }
1879
    if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL) {
32!
1880
      RAW_RETURN_CHECK(
32!
1881
          taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq)));
1882
    } else {
1883
      tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
×
1884
      pCreateReq = (SVCreateTbReq){0};
×
1885
    }
1886

1887
    tDecoderClear(&decoderTmp);
32✔
1888
  }
1889
  return 0;
16✔
1890

1891
end:
×
1892
  tDecoderClear(&decoderTmp);
×
1893
  tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
×
1894
  return code;
×
1895
}
1896

1897
typedef enum {
1898
  WRITE_RAW_INIT_START = 0,
1899
  WRITE_RAW_INIT_OK,
1900
  WRITE_RAW_INIT_FAIL,
1901
} WRITE_RAW_INIT_STATUS;
1902

1903
static SHashObj* writeRawCache = NULL;
1904
static int8_t    initFlag = 0;
1905
static int8_t    initedFlag = WRITE_RAW_INIT_START;
1906

1907
typedef struct {
1908
  SHashObj* pVgHash;
1909
  SHashObj* pNameHash;
1910
  SHashObj* pMetaHash;
1911
} rawCacheInfo;
1912

1913
typedef struct {
1914
  SVgroupInfo vgInfo;
1915
  int64_t     uid;
1916
  int64_t     suid;
1917
} tbInfo;
1918

1919
static void tmqFreeMeta(void* data) {
46✔
1920
  if (data == NULL) {
46!
1921
    uError("invalid parameter in %s", __func__);
×
1922
    return;
×
1923
  }
1924
  STableMeta* pTableMeta = *(STableMeta**)data;
46✔
1925
  taosMemoryFree(pTableMeta);
46!
1926
}
1927

1928
static void freeRawCache(void* data) {
×
1929
  if (data == NULL) {
×
1930
    uError("invalid parameter in %s", __func__);
×
1931
    return;
×
1932
  }
1933
  rawCacheInfo* pRawCache = (rawCacheInfo*)data;
×
1934
  taosHashCleanup(pRawCache->pMetaHash);
×
1935
  taosHashCleanup(pRawCache->pNameHash);
×
1936
  taosHashCleanup(pRawCache->pVgHash);
×
1937
}
1938

1939
static int32_t initRawCacheHash() {
17✔
1940
  if (writeRawCache == NULL) {
17!
1941
    writeRawCache = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
17✔
1942
    if (writeRawCache == NULL) {
17!
1943
      return terrno;
×
1944
    }
1945
    taosHashSetFreeFp(writeRawCache, freeRawCache);
17✔
1946
  }
1947
  return 0;
17✔
1948
}
1949

1950
static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW) {
15✔
1951
  if (rawData == NULL || pSW == NULL){
15!
1952
    return false;
1✔
1953
  }
1954
  if (pTableMeta == NULL) {
14!
1955
    uError("invalid parameter in %s", __func__);
×
1956
    return false;
×
1957
  }
1958
  char* p = (char*)rawData;
14✔
1959
  // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
1960
  // column length |
1961
  p += sizeof(int32_t);
14✔
1962
  p += sizeof(int32_t);
14✔
1963
  p += sizeof(int32_t);
14✔
1964
  p += sizeof(int32_t);
14✔
1965
  p += sizeof(int32_t);
14✔
1966
  p += sizeof(uint64_t);
14✔
1967
  int8_t* fields = p;
14✔
1968

1969
  if (pSW->nCols != pTableMeta->tableInfo.numOfColumns) {
14✔
1970
    return true;
3✔
1971
  }
1972

1973
  for (int i = 0; i < pSW->nCols; i++) {
57✔
1974
    int j = 0;
46✔
1975
    for (; j < pTableMeta->tableInfo.numOfColumns; j++) {
120!
1976
      SSchema* pColSchema = &pTableMeta->schema[j];
120✔
1977
      char*    fieldName = pSW->pSchema[i].name;
120✔
1978

1979
      if (strcmp(pColSchema->name, fieldName) == 0) {
120✔
1980
        if (checkSchema(pColSchema, fields, NULL, 0) != 0){
46!
1981
          return true;
×
1982
        }
1983
        break;
46✔
1984
      }
1985
    }
1986
    fields += sizeof(int8_t) + sizeof(int32_t);
46✔
1987

1988
    if (j == pTableMeta->tableInfo.numOfColumns) return true;
46!
1989
  }
1990
  return false;
11✔
1991
}
1992

1993
static int32_t getRawCache(SHashObj** pVgHash, SHashObj** pNameHash, SHashObj** pMetaHash, void* key) {
71✔
1994
  if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || key == NULL) {
71!
1995
    uError("invalid parameter in %s", __func__);
×
1996
    return TSDB_CODE_INVALID_PARA;
×
1997
  }
1998
  int32_t code = 0;
71✔
1999
  void*   cacheInfo = taosHashGet(writeRawCache, &key, POINTER_BYTES);
71✔
2000
  if (cacheInfo == NULL) {
71✔
2001
    *pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
65✔
2002
    RAW_NULL_CHECK(*pVgHash);
65!
2003
    *pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
65✔
2004
    RAW_NULL_CHECK(*pNameHash);
65!
2005
    *pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
65✔
2006
    RAW_NULL_CHECK(*pMetaHash);
65!
2007
    taosHashSetFreeFp(*pMetaHash, tmqFreeMeta);
65✔
2008
    rawCacheInfo info = {*pVgHash, *pNameHash, *pMetaHash};
65✔
2009
    RAW_RETURN_CHECK(taosHashPut(writeRawCache, &key, POINTER_BYTES, &info, sizeof(rawCacheInfo)));
65!
2010
  } else {
2011
    rawCacheInfo* info = (rawCacheInfo*)cacheInfo;
6✔
2012
    *pVgHash = info->pVgHash;
6✔
2013
    *pNameHash = info->pNameHash;
6✔
2014
    *pMetaHash = info->pMetaHash;
6✔
2015
  }
2016

2017
  return 0;
71✔
2018
end:
×
2019
  taosHashCleanup(*pMetaHash);
×
2020
  taosHashCleanup(*pNameHash);
×
2021
  taosHashCleanup(*pVgHash);
×
2022
  return code;
×
2023
}
2024

2025
static int32_t buildRawRequest(TAOS* taos, SRequestObj** pRequest, SCatalog** pCatalog, SRequestConnInfo* conn) {
71✔
2026
  if (taos == NULL || pRequest == NULL || pCatalog == NULL || conn == NULL) {
71!
2027
    uError("invalid parameter in %s", __func__);
×
2028
    return TSDB_CODE_INVALID_PARA;
×
2029
  }
2030
  int32_t code = 0;
71✔
2031
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, pRequest, 0));
71!
2032
  (*pRequest)->syncQuery = true;
71✔
2033
  if (!(*pRequest)->pDb) {
71!
2034
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
2035
    goto end;
×
2036
  }
2037

2038
  RAW_RETURN_CHECK(catalogGetHandle((*pRequest)->pTscObj->pAppInfo->clusterId, pCatalog));
71!
2039
  conn->pTrans = (*pRequest)->pTscObj->pAppInfo->pTransporter;
71✔
2040
  conn->requestId = (*pRequest)->requestId;
71✔
2041
  conn->requestObjRefId = (*pRequest)->self;
71✔
2042
  conn->mgmtEps = getEpSet_s(&(*pRequest)->pTscObj->pAppInfo->mgmtEp);
71✔
2043

2044
end:
71✔
2045
  return code;
71✔
2046
}
2047

2048
typedef int32_t _raw_decode_func_(SDecoder* pDecoder, SMqDataRsp* pRsp);
2049
static int32_t  decodeRawData(SDecoder* decoder, void* data, uint32_t dataLen, _raw_decode_func_ func,
71✔
2050
                              SMqRspObj* rspObj) {
2051
  if (decoder == NULL || data == NULL || func == NULL || rspObj == NULL) {
71!
2052
    uError("invalid parameter in %s", __func__);
×
2053
    return TSDB_CODE_INVALID_PARA;
×
2054
  }
2055
  int8_t dataVersion = *(int8_t*)data;
71✔
2056
  if (dataVersion >= MQ_DATA_RSP_VERSION) {
71✔
2057
    data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
53✔
2058
    if (dataLen < sizeof(int8_t) + sizeof(int32_t)) {
53!
2059
      return TSDB_CODE_INVALID_PARA;
×
2060
    }
2061
    dataLen -= sizeof(int8_t) + sizeof(int32_t);
53✔
2062
  }
2063

2064
  rspObj->resIter = -1;
71✔
2065
  tDecoderInit(decoder, data, dataLen);
71✔
2066
  int32_t code = func(decoder, &rspObj->dataRsp);
71✔
2067
  if (code != 0) {
71!
2068
    SET_ERROR_MSG("decode mq taosx data rsp failed");
×
2069
  }
2070
  return code;
71✔
2071
}
2072

2073
static int32_t processCacheMeta(SHashObj* pVgHash, SHashObj* pNameHash, SHashObj* pMetaHash,
163✔
2074
                                SVCreateTbReq* pCreateReqDst, SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName,
2075
                                STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry) {
2076
  if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || pCatalog == NULL || conn == NULL || pName == NULL ||
163!
2077
      pMeta == NULL) {
2078
    uError("invalid parameter in %s", __func__);
×
2079
    return TSDB_CODE_INVALID_PARA;
×
2080
  }
2081
  int32_t     code = 0;
163✔
2082
  STableMeta* pTableMeta = NULL;
163✔
2083
  tbInfo*     tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
163✔
2084
  if (tmpInfo == NULL || retry > 0) {
163!
2085
    tbInfo info = {0};
148✔
2086

2087
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, conn, pName, &info.vgInfo));
148!
2088
    if (pCreateReqDst && tmpInfo == NULL) {  // change stable name to get meta
148!
2089
      tstrncpy(pName->tname, pCreateReqDst->ctb.stbName, TSDB_TABLE_NAME_LEN);
31✔
2090
    }
2091
    RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
148!
2092
    info.uid = pTableMeta->uid;
148✔
2093
    if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
148✔
2094
      info.suid = pTableMeta->suid;
100✔
2095
    } else {
2096
      info.suid = pTableMeta->uid;
48✔
2097
    }
2098
    code = taosHashPut(pMetaHash, &info.suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
148✔
2099
    if (code != 0) {
148!
2100
      taosMemoryFree(pTableMeta);
×
2101
      goto end;
×
2102
    }
2103
    if (pCreateReqDst) {
148✔
2104
      pTableMeta->vgId = info.vgInfo.vgId;
31✔
2105
      pTableMeta->uid = pCreateReqDst->uid;
31✔
2106
      pCreateReqDst->ctb.suid = pTableMeta->suid;
31✔
2107
    }
2108

2109
    RAW_RETURN_CHECK(taosHashPut(pNameHash, pName->tname, strlen(pName->tname), &info, sizeof(tbInfo)));
148!
2110
    tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
148✔
2111
    RAW_RETURN_CHECK(
148!
2112
        taosHashPut(pVgHash, &info.vgInfo.vgId, sizeof(info.vgInfo.vgId), &info.vgInfo, sizeof(SVgroupInfo)));
2113
  }
2114

2115
  if (pTableMeta == NULL || retry > 0) {
163!
2116
    STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, &tmpInfo->suid, LONG_BYTES);
15✔
2117
    if (pTableMetaTmp == NULL || retry > 0 || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) {
15!
2118
      RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
3!
2119
      code = taosHashPut(pMetaHash, &tmpInfo->suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
3✔
2120
      if (code != 0) {
3!
2121
        taosMemoryFree(pTableMeta);
×
2122
        goto end;
×
2123
      }
2124

2125
    } else {
2126
      pTableMeta = *pTableMetaTmp;
12✔
2127
      pTableMeta->uid = tmpInfo->uid;
12✔
2128
      pTableMeta->vgId = tmpInfo->vgInfo.vgId;
12✔
2129
    }
2130
  }
2131
  *pMeta = pTableMeta;
163✔
2132

2133
end:
163✔
2134
  return code;
163✔
2135
}
2136

2137
static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
37✔
2138
  if (taos == NULL || data == NULL) {
37!
2139
    uError("invalid parameter in %s", __func__);
×
2140
    return TSDB_CODE_INVALID_PARA;
×
2141
  }
2142
  int32_t   code = TSDB_CODE_SUCCESS;
37✔
2143
  SQuery*   pQuery = NULL;
37✔
2144
  SMqRspObj rspObj = {0};
37✔
2145
  SDecoder  decoder = {0};
37✔
2146

2147
  SRequestObj*     pRequest = NULL;
37✔
2148
  SCatalog*        pCatalog = NULL;
37✔
2149
  SRequestConnInfo conn = {0};
37✔
2150
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
37!
2151
  uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
37!
2152
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
37!
2153

2154
  SHashObj* pVgHash = NULL;
37✔
2155
  SHashObj* pNameHash = NULL;
37✔
2156
  SHashObj* pMetaHash = NULL;
37✔
2157
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
37!
2158
  int retry = 0;
37✔
2159
  while (1) {
2160
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
37!
2161
    uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
37!
2162
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
138✔
2163
      if (!rspObj.dataRsp.withSchema) {
101!
2164
        goto end;
×
2165
      }
2166

2167
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
101✔
2168
      RAW_NULL_CHECK(tbName);
101!
2169
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
101✔
2170
      RAW_NULL_CHECK(pSW);
101!
2171
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
101✔
2172
      RAW_NULL_CHECK(pRetrieve);
101!
2173
      void* rawData = getRawDataFromRes(pRetrieve);
101✔
2174
      RAW_NULL_CHECK(rawData);
101!
2175

2176
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
101!
2177
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
101✔
2178
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
101✔
2179
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
101✔
2180

2181
      STableMeta* pTableMeta = NULL;
101✔
2182
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, &pTableMeta, pSW,
101!
2183
                                        rawData, retry));
2184
      char err[ERR_MSG_LEN] = {0};
101✔
2185
      code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
101✔
2186
      if (code != TSDB_CODE_SUCCESS) {
101!
2187
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
2188
        goto end;
×
2189
      }
2190
    }
2191
    RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
37!
2192
    launchQueryImpl(pRequest, pQuery, true, NULL);
37✔
2193
    code = pRequest->code;
37✔
2194

2195
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
37!
2196
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
2197
      qDestroyQuery(pQuery);
×
2198
      pQuery = NULL;
×
2199
      rspObj.resIter = -1;
×
2200
      continue;
×
2201
    }
2202
    break;
37✔
2203
  }
2204

2205
end:
37✔
2206
  uDebug(LOG_ID_TAG " write raw data return, msg:%s", LOG_ID_VALUE, tstrerror(code));
37!
2207
  tDeleteMqDataRsp(&rspObj.dataRsp);
37✔
2208
  tDecoderClear(&decoder);
37✔
2209
  qDestroyQuery(pQuery);
37✔
2210
  destroyRequest(pRequest);
37✔
2211
  return code;
37✔
2212
}
2213

2214
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
16✔
2215
  if (taos == NULL || data == NULL) {
16!
2216
    uError("invalid parameter in %s", __func__);
×
2217
    return TSDB_CODE_INVALID_PARA;
×
2218
  }
2219
  int32_t   code = TSDB_CODE_SUCCESS;
16✔
2220
  SQuery*   pQuery = NULL;
16✔
2221
  SMqRspObj rspObj = {0};
16✔
2222
  SDecoder  decoder = {0};
16✔
2223
  SHashObj* pCreateTbHash = NULL;
16✔
2224

2225
  SRequestObj*     pRequest = NULL;
16✔
2226
  SCatalog*        pCatalog = NULL;
16✔
2227
  SRequestConnInfo conn = {0};
16✔
2228

2229
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
16!
2230
  uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
16!
2231
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeSTaosxRsp, &rspObj));
16!
2232

2233
  pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
16✔
2234
  RAW_NULL_CHECK(pCreateTbHash);
16!
2235
  RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash));
16!
2236

2237
  SHashObj* pVgHash = NULL;
16✔
2238
  SHashObj* pNameHash = NULL;
16✔
2239
  SHashObj* pMetaHash = NULL;
16✔
2240
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
16!
2241
  int retry = 0;
16✔
2242
  while (1) {
2243
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
16!
2244
    uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
16!
2245
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
60✔
2246
      if (!rspObj.dataRsp.withSchema) {
44!
2247
        goto end;
×
2248
      }
2249

2250
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
44✔
2251
      RAW_NULL_CHECK(tbName);
44!
2252
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
44✔
2253
      RAW_NULL_CHECK(pSW);
44!
2254
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
44✔
2255
      RAW_NULL_CHECK(pRetrieve);
44!
2256
      void* rawData = getRawDataFromRes(pRetrieve);
44✔
2257
      RAW_NULL_CHECK(rawData);
44!
2258

2259
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
44!
2260
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
44✔
2261
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
44✔
2262
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
44✔
2263

2264
      // find schema data info
2265
      SVCreateTbReq* pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, pName.tname, strlen(pName.tname));
44✔
2266
      STableMeta*    pTableMeta = NULL;
44✔
2267
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, pCreateReqDst, pCatalog, &conn, &pName,
44!
2268
                                        &pTableMeta, pSW, rawData, retry));
2269
      char err[ERR_MSG_LEN] = {0};
44✔
2270
      code =
2271
          rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
44✔
2272
      if (code != TSDB_CODE_SUCCESS) {
44!
2273
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
2274
        goto end;
×
2275
      }
2276
    }
2277
    RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
16!
2278
    launchQueryImpl(pRequest, pQuery, true, NULL);
16✔
2279
    code = pRequest->code;
16✔
2280

2281
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
16!
2282
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
2283
      qDestroyQuery(pQuery);
×
2284
      pQuery = NULL;
×
2285
      rspObj.resIter = -1;
×
2286
      continue;
×
2287
    }
2288
    break;
16✔
2289
  }
2290

2291
end:
16✔
2292
  uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
16!
2293
  tDeleteSTaosxRsp(&rspObj.dataRsp);
16✔
2294
  void* pIter = taosHashIterate(pCreateTbHash, NULL);
16✔
2295
  while (pIter) {
48✔
2296
    tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE);
32✔
2297
    pIter = taosHashIterate(pCreateTbHash, pIter);
32✔
2298
  }
2299
  taosHashCleanup(pCreateTbHash);
16✔
2300
  tDecoderClear(&decoder);
16✔
2301
  qDestroyQuery(pQuery);
16✔
2302
  destroyRequest(pRequest);
16✔
2303
  return code;
16✔
2304
}
2305

2306
static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
18✔
2307
  if (taos == NULL || data == NULL) {
18!
2308
    uError("invalid parameter in %s", __func__);
×
2309
    return TSDB_CODE_INVALID_PARA;
×
2310
  }
2311
  int32_t   code = TSDB_CODE_SUCCESS;
18✔
2312
  SQuery*   pQuery = NULL;
18✔
2313
  SHashObj* pVgroupHash = NULL;
18✔
2314
  SMqRspObj rspObj = {0};
18✔
2315
  SDecoder  decoder = {0};
18✔
2316

2317
  SRequestObj*     pRequest = NULL;
18✔
2318
  SCatalog*        pCatalog = NULL;
18✔
2319
  SRequestConnInfo conn = {0};
18✔
2320

2321
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
18!
2322
  uDebug(LOG_ID_TAG " write raw rawdata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
18!
2323
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
18!
2324

2325
  SHashObj* pVgHash = NULL;
18✔
2326
  SHashObj* pNameHash = NULL;
18✔
2327
  SHashObj* pMetaHash = NULL;
18✔
2328
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
18!
2329
  int retry = 0;
18✔
2330
  while (1) {
×
2331
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
18!
2332
    uDebug(LOG_ID_TAG " write raw rawdata block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
18!
2333
    SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(pQuery)->pRoot;
18✔
2334
    pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
18✔
2335
    RAW_NULL_CHECK(pVgroupHash);
18!
2336
    pStmt->pVgDataBlocks = taosArrayInit(8, POINTER_BYTES);
18✔
2337
    RAW_NULL_CHECK(pStmt->pVgDataBlocks);
18!
2338

2339
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
36✔
2340
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
18✔
2341
      RAW_NULL_CHECK(tbName);
18!
2342
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
18✔
2343
      RAW_NULL_CHECK(pRetrieve);
18!
2344
      void* rawData = getRawDataFromRes(pRetrieve);
18✔
2345
      RAW_NULL_CHECK(rawData);
18!
2346

2347
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
18!
2348
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
18✔
2349
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
18✔
2350
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
18✔
2351

2352
      // find schema data info
2353
      STableMeta*    pTableMeta = NULL;
18✔
2354
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName,
18!
2355
                                        &pTableMeta, NULL, NULL, retry));
2356
      char err[ERR_MSG_LEN] = {0};
18✔
2357
      code = rawBlockBindRawData(pVgroupHash, pStmt->pVgDataBlocks, pTableMeta, rawData);
18✔
2358
      if (code != TSDB_CODE_SUCCESS) {
18!
2359
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
2360
        goto end;
×
2361
      }
2362
    }
2363
    taosHashCleanup(pVgroupHash);
18✔
2364
    pVgroupHash = NULL;
18✔
2365

2366
    RAW_RETURN_CHECK(smlBuildOutputRaw(pQuery, pVgHash));
18!
2367
    launchQueryImpl(pRequest, pQuery, true, NULL);
18✔
2368
    code = pRequest->code;
18✔
2369

2370
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
18!
2371
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
2372
      qDestroyQuery(pQuery);
×
2373
      pQuery = NULL;
×
2374
      rspObj.resIter = -1;
×
2375
      continue;
×
2376
    }
2377
    break;
18✔
2378
  }
2379

2380
  end:
18✔
2381
  uDebug(LOG_ID_TAG " write raw rawdata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
18!
2382
  tDeleteMqDataRsp(&rspObj.dataRsp);
18✔
2383
  tDecoderClear(&decoder);
18✔
2384
  qDestroyQuery(pQuery);
18✔
2385
  taosHashCleanup(pVgroupHash);
18✔
2386
  destroyRequest(pRequest);
18✔
2387
  return code;
18✔
2388
}
2389

2390
static void processSimpleMeta(SMqMetaRsp* pMetaRsp, cJSON** meta) {
335✔
2391
  if (pMetaRsp == NULL || meta == NULL) {
335!
2392
    uError("invalid parameter in %s", __func__);
×
2393
    return;
×
2394
  }
2395
  if (pMetaRsp->resMsgType == TDMT_VND_CREATE_STB) {
335✔
2396
    processCreateStb(pMetaRsp, meta);
91✔
2397
  } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_STB) {
244✔
2398
    processAlterStb(pMetaRsp, meta);
55✔
2399
  } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_STB) {
189✔
2400
    processDropSTable(pMetaRsp, meta);
8✔
2401
  } else if (pMetaRsp->resMsgType == TDMT_VND_CREATE_TABLE) {
181✔
2402
    processCreateTable(pMetaRsp, meta);
141✔
2403
  } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_TABLE) {
40✔
2404
    processAlterTable(pMetaRsp, meta);
31✔
2405
  } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_TABLE) {
9✔
2406
    processDropTable(pMetaRsp, meta);
5✔
2407
  } else if (pMetaRsp->resMsgType == TDMT_VND_DELETE) {
4!
2408
    processDeleteTable(pMetaRsp, meta);
4✔
2409
  }
2410
}
2411

2412
static void processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp, char** string) {
14✔
2413
  if (pMsgRsp == NULL || string == NULL) {
14!
UNCOV
2414
    uError("invalid parameter in %s", __func__);
×
2415
    return;
14✔
2416
  }
2417
  SDecoder        coder = {0};
14✔
2418
  SMqBatchMetaRsp rsp = {0};
14✔
2419
  int32_t         code = 0;
14✔
2420
  cJSON*          pJson = NULL;
14✔
2421
  tDecoderInit(&coder, pMsgRsp->pMetaBuff, pMsgRsp->metaBuffLen);
14✔
2422
  if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) {
14!
UNCOV
2423
    goto end;
×
2424
  }
2425

2426
  pJson = cJSON_CreateObject();
14✔
2427
  RAW_NULL_CHECK(pJson);
14!
2428
  RAW_FALSE_CHECK(cJSON_AddStringToObject(pJson, "tmq_meta_version", TMQ_META_VERSION));
14!
2429
  cJSON* pMetaArr = cJSON_CreateArray();
14✔
2430
  RAW_NULL_CHECK(pMetaArr);
14!
2431
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(pJson, "metas", pMetaArr));
14!
2432

2433
  int32_t num = taosArrayGetSize(rsp.batchMetaReq);
14✔
2434
  for (int32_t i = 0; i < num; i++) {
143✔
2435
    int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
129✔
2436
    RAW_NULL_CHECK(len);
129!
2437
    void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
129✔
2438
    RAW_NULL_CHECK(tmpBuf);
129!
2439
    SDecoder   metaCoder = {0};
129✔
2440
    SMqMetaRsp metaRsp = {0};
129✔
2441
    tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead));
129✔
2442
    if (tDecodeMqMetaRsp(&metaCoder, &metaRsp) < 0) {
129!
UNCOV
2443
      goto end;
×
2444
    }
2445
    cJSON* pItem = NULL;
129✔
2446
    processSimpleMeta(&metaRsp, &pItem);
129✔
2447
    tDeleteMqMetaRsp(&metaRsp);
129✔
2448
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(pMetaArr, pItem));
129!
2449
  }
2450

2451
  tDeleteMqBatchMetaRsp(&rsp);
14✔
2452
  char* fullStr = cJSON_PrintUnformatted(pJson);
14✔
2453
  cJSON_Delete(pJson);
14✔
2454
  *string = fullStr;
14✔
2455
  return;
14✔
2456

UNCOV
2457
end:
×
UNCOV
2458
  cJSON_Delete(pJson);
×
2459
  tDeleteMqBatchMetaRsp(&rsp);
×
2460
}
2461

2462
char* tmq_get_json_meta(TAOS_RES* res) {
288✔
2463
  if (res == NULL) {
288!
UNCOV
2464
    uError("invalid parameter in %s", __func__);
×
UNCOV
2465
    return NULL;
×
2466
  }
2467
  uDebug("tmq_get_json_meta res:%p", res);
288!
2468
  if (!TD_RES_TMQ_META(res) && !TD_RES_TMQ_METADATA(res) && !TD_RES_TMQ_BATCH_META(res)) {
288!
UNCOV
2469
    return NULL;
×
2470
  }
2471

2472
  char*      string = NULL;
288✔
2473
  SMqRspObj* rspObj = (SMqRspObj*)res;
288✔
2474
  if (TD_RES_TMQ_METADATA(res)) {
288✔
2475
    processAutoCreateTable(&rspObj->dataRsp, &string);
68✔
2476
  } else if (TD_RES_TMQ_BATCH_META(res)) {
220✔
2477
    processBatchMetaToJson(&rspObj->batchMetaRsp, &string);
14✔
2478
  } else if (TD_RES_TMQ_META(res)) {
206!
2479
    cJSON* pJson = NULL;
206✔
2480
    processSimpleMeta(&rspObj->metaRsp, &pJson);
206✔
2481
    string = cJSON_PrintUnformatted(pJson);
206✔
2482
    cJSON_Delete(pJson);
206✔
2483
  } else {
UNCOV
2484
    uError("tmq_get_json_meta res:%d, invalid type", *(int8_t*)res);
×
2485
  }
2486

2487
  uDebug("tmq_get_json_meta string:%s", string);
288!
2488
  return string;
288✔
2489
}
2490

2491
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
332!
2492

2493
static int32_t getOffSetLen(const SMqDataRsp* pRsp) {
105✔
2494
  if (pRsp == NULL) {
105!
UNCOV
2495
    uError("invalid parameter in %s", __func__);
×
UNCOV
2496
    return TSDB_CODE_INVALID_PARA;
×
2497
  }
2498
  SEncoder coder = {0};
105✔
2499
  tEncoderInit(&coder, NULL, 0);
105✔
2500
  if (tEncodeSTqOffsetVal(&coder, &pRsp->reqOffset) < 0) return -1;
105!
2501
  if (tEncodeSTqOffsetVal(&coder, &pRsp->rspOffset) < 0) return -1;
105!
2502
  int32_t pos = coder.pos;
105✔
2503
  tEncoderClear(&coder);
105✔
2504
  return pos;
105✔
2505
}
2506

2507
typedef int32_t __encode_func__(SEncoder* pEncoder, const SMqDataRsp* pRsp);
2508
static int32_t  encodeMqDataRsp(__encode_func__* encodeFunc, SMqDataRsp* rspObj, tmq_raw_data* raw) {
105✔
2509
  if (raw == NULL || encodeFunc == NULL || rspObj == NULL) {
105!
UNCOV
2510
    uError("invalid parameter in %s", __func__);
×
UNCOV
2511
    return TSDB_CODE_INVALID_PARA;
×
2512
  }
2513
  uint32_t len = 0;
105✔
2514
  int32_t  code = 0;
105✔
2515
  SEncoder encoder = {0};
105✔
2516
  void*    buf = NULL;
105✔
2517
  tEncodeSize(encodeFunc, rspObj, len, code);
105!
2518
  if (code < 0) {
105!
UNCOV
2519
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
2520
    goto FAILED;
×
2521
  }
2522
  len += sizeof(int8_t) + sizeof(int32_t);
105✔
2523
  buf = taosMemoryCalloc(1, len);
105!
2524
  if (buf == NULL) {
105!
UNCOV
2525
    code = terrno;
×
UNCOV
2526
    goto FAILED;
×
2527
  }
2528
  tEncoderInit(&encoder, buf, len);
105✔
2529
  if (tEncodeI8(&encoder, MQ_DATA_RSP_VERSION) < 0) {
105!
UNCOV
2530
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
2531
    goto FAILED;
×
2532
  }
2533
  int32_t offsetLen = getOffSetLen(rspObj);
105✔
2534
  if (offsetLen <= 0) {
105!
UNCOV
2535
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
2536
    goto FAILED;
×
2537
  }
2538
  if (tEncodeI32(&encoder, offsetLen) < 0) {
105!
UNCOV
2539
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
2540
    goto FAILED;
×
2541
  }
2542
  if (encodeFunc(&encoder, rspObj) < 0) {
105!
UNCOV
2543
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
2544
    goto FAILED;
×
2545
  }
2546
  tEncoderClear(&encoder);
105✔
2547

2548
  raw->raw = buf;
105✔
2549
  raw->raw_len = len;
105✔
2550
  return code;
105✔
UNCOV
2551
FAILED:
×
UNCOV
2552
  tEncoderClear(&encoder);
×
2553
  taosMemoryFree(buf);
×
2554
  return code;
×
2555
}
2556

2557
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
368✔
2558
  if (raw == NULL || res == NULL) {
368!
UNCOV
2559
    uError("invalid parameter in %s", __func__);
×
UNCOV
2560
    return TSDB_CODE_INVALID_PARA;
×
2561
  }
2562
  *raw = (tmq_raw_data){0};
368✔
2563
  SMqRspObj* rspObj = ((SMqRspObj*)res);
368✔
2564
  if (TD_RES_TMQ_META(res)) {
368✔
2565
    raw->raw = rspObj->metaRsp.metaRsp;
231✔
2566
    raw->raw_len = rspObj->metaRsp.metaRspLen >= 0 ? rspObj->metaRsp.metaRspLen : 0;
231✔
2567
    raw->raw_type = rspObj->metaRsp.resMsgType;
231✔
2568
    uDebug("tmq get raw type meta:%p", raw);
231!
2569
  } else if (TD_RES_TMQ(res)) {
137✔
2570
    int32_t code = encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->dataRsp, raw);
37✔
2571
    if (code != 0) {
37!
UNCOV
2572
      uError("tmq get raw type error:%d", terrno);
×
UNCOV
2573
      return code;
×
2574
    }
2575
    raw->raw_type = RES_TYPE__TMQ;
37✔
2576
    uDebug("tmq get raw type data:%p", raw);
37!
2577
  } else if (TD_RES_TMQ_METADATA(res)) {
100✔
2578
    int32_t code = encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->dataRsp, raw);
68✔
2579
    if (code != 0) {
68!
UNCOV
2580
      uError("tmq get raw type error:%d", terrno);
×
UNCOV
2581
      return code;
×
2582
    }
2583
    raw->raw_type = RES_TYPE__TMQ_METADATA;
68✔
2584
    uDebug("tmq get raw type metadata:%p", raw);
68!
2585
  } else if (TD_RES_TMQ_BATCH_META(res)) {
32✔
2586
    raw->raw = rspObj->batchMetaRsp.pMetaBuff;
14✔
2587
    raw->raw_len = rspObj->batchMetaRsp.metaBuffLen;
14✔
2588
    raw->raw_type = rspObj->resType;
14✔
2589
    uDebug("tmq get raw batch meta:%p", raw);
14!
2590
  } else if (TD_RES_TMQ_RAW(res)) {
18!
2591
    raw->raw = rspObj->dataRsp.rawData;
18✔
2592
    rspObj->dataRsp.rawData = NULL;
18✔
2593
    raw->raw_len = rspObj->dataRsp.len;
18✔
2594
    raw->raw_type = rspObj->resType;
18✔
2595
    uDebug("tmq get raw raw:%p", raw);
18!
2596
  } else {
UNCOV
2597
    uError("tmq get raw error type:%d", *(int8_t*)res);
×
UNCOV
2598
    return TSDB_CODE_TMQ_INVALID_MSG;
×
2599
  }
2600
  return TSDB_CODE_SUCCESS;
368✔
2601
}
2602

2603
void tmq_free_raw(tmq_raw_data raw) {
316✔
2604
  uDebug("tmq free raw data type:%d", raw.raw_type);
316!
2605
  if (raw.raw_type == RES_TYPE__TMQ ||
316✔
2606
      raw.raw_type == RES_TYPE__TMQ_METADATA) {
279✔
2607
    taosMemoryFree(raw.raw);
53!
2608
  } else if(raw.raw_type == RES_TYPE__TMQ_RAWDATA && raw.raw != NULL){
263!
2609
    taosMemoryFree(POINTER_SHIFT(raw.raw, - sizeof(SMqRspHead)));
18!
2610
  }
2611
  (void)memset(terrMsg, 0, ERR_MSG_LEN);
316✔
2612
}
316✔
2613

2614
static int32_t writeRawInit() {
420✔
2615
  while (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_START) {
437✔
2616
    int8_t old = atomic_val_compare_exchange_8(&initFlag, 0, 1);
17✔
2617
    if (old == 0) {
17!
2618
      int32_t code = initRawCacheHash();
17✔
2619
      if (code != 0) {
17!
UNCOV
2620
        uError("tmq writeRawImpl init error:%d", code);
×
UNCOV
2621
        atomic_store_8(&initedFlag, WRITE_RAW_INIT_FAIL);
×
2622
        return code;
×
2623
      }
2624
      atomic_store_8(&initedFlag, WRITE_RAW_INIT_OK);
17✔
2625
    }
2626
  }
2627

2628
  if (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_FAIL) {
420!
UNCOV
2629
    return TSDB_CODE_INTERNAL_ERROR;
×
2630
  }
2631
  return 0;
420✔
2632
}
2633

2634
static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) {
420✔
2635
  if (taos == NULL || buf == NULL) {
420!
UNCOV
2636
    uError("invalid parameter in %s", __func__);
×
UNCOV
2637
    return TSDB_CODE_INVALID_PARA;
×
2638
  }
2639
  if (writeRawInit() != 0) {
420!
UNCOV
2640
    return TSDB_CODE_INTERNAL_ERROR;
×
2641
  }
2642

2643
  if (type == TDMT_VND_CREATE_STB) {
420✔
2644
    return taosCreateStb(taos, buf, len);
91✔
2645
  } else if (type == TDMT_VND_ALTER_STB) {
329✔
2646
    return taosCreateStb(taos, buf, len);
55✔
2647
  } else if (type == TDMT_VND_DROP_STB) {
274✔
2648
    return taosDropStb(taos, buf, len);
8✔
2649
  } else if (type == TDMT_VND_CREATE_TABLE) {
266✔
2650
    return taosCreateTable(taos, buf, len);
141✔
2651
  } else if (type == TDMT_VND_ALTER_TABLE) {
125✔
2652
    return taosAlterTable(taos, buf, len);
31✔
2653
  } else if (type == TDMT_VND_DROP_TABLE) {
94✔
2654
    return taosDropTable(taos, buf, len);
5✔
2655
  } else if (type == TDMT_VND_DELETE) {
89✔
2656
    return taosDeleteData(taos, buf, len);
4✔
2657
  } else if (type == RES_TYPE__TMQ_METADATA) {
85✔
2658
    return tmqWriteRawMetaDataImpl(taos, buf, len);
16✔
2659
  } else if (type == RES_TYPE__TMQ_RAWDATA) {
69✔
2660
    return tmqWriteRawRawDataImpl(taos, buf, len);
18✔
2661
  } else if (type == RES_TYPE__TMQ) {
51✔
2662
    return tmqWriteRawDataImpl(taos, buf, len);
37✔
2663
  } else if (type == RES_TYPE__TMQ_BATCH_META) {
14!
2664
    return tmqWriteBatchMetaDataImpl(taos, buf, len);
14✔
2665
  }
UNCOV
2666
  return TSDB_CODE_INVALID_PARA;
×
2667
}
2668

2669
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
305✔
2670
  if (taos == NULL || raw.raw == NULL || raw.raw_len <= 0) {
305!
2671
    SET_ERROR_MSG("taos:%p or data:%p is NULL or raw_len <= 0", taos, raw.raw);
14✔
2672
    return TSDB_CODE_INVALID_PARA;
14✔
2673
  }
2674
  taosClearErrMsg(); // clear global error message
291✔
2675

2676
  return writeRawImpl(taos, raw.raw, raw.raw_len, raw.raw_type);
291✔
2677
}
2678

2679
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen) {
14✔
2680
  if (taos == NULL || meta == NULL) {
14!
UNCOV
2681
    uError("invalid parameter in %s", __func__);
×
UNCOV
2682
    return TSDB_CODE_INVALID_PARA;
×
2683
  }
2684
  SMqBatchMetaRsp rsp = {0};
14✔
2685
  SDecoder        coder = {0};
14✔
2686
  int32_t         code = TSDB_CODE_SUCCESS;
14✔
2687

2688
  // decode and process req
2689
  tDecoderInit(&coder, meta, metaLen);
14✔
2690
  if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) {
14!
UNCOV
2691
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
2692
    goto end;
×
2693
  }
2694
  int32_t num = taosArrayGetSize(rsp.batchMetaReq);
14✔
2695
  for (int32_t i = 0; i < num; i++) {
143✔
2696
    int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
129✔
2697
    RAW_NULL_CHECK(len);
129!
2698
    void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
129✔
2699
    RAW_NULL_CHECK(tmpBuf);
129!
2700
    SDecoder   metaCoder = {0};
129✔
2701
    SMqMetaRsp metaRsp = {0};
129✔
2702
    tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead));
129✔
2703
    if (tDecodeMqMetaRsp(&metaCoder, &metaRsp) < 0) {
129!
UNCOV
2704
      code = TSDB_CODE_INVALID_PARA;
×
UNCOV
2705
      goto end;
×
2706
    }
2707
    code = writeRawImpl(taos, metaRsp.metaRsp, metaRsp.metaRspLen, metaRsp.resMsgType);
129✔
2708
    tDeleteMqMetaRsp(&metaRsp);
129✔
2709
    if (code != TSDB_CODE_SUCCESS) {
129!
UNCOV
2710
      goto end;
×
2711
    }
2712
  }
2713

2714
end:
14✔
2715
  tDeleteMqBatchMetaRsp(&rsp);
14✔
2716
  return code;
14✔
2717
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc