• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.96
/source/client/src/clientRawBlockWrite.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "parser.h"
19
#include "tcol.h"
20
#include "tcompression.h"
21
#include "tdatablock.h"
22
#include "tdef.h"
23
#include "tglobal.h"
24
#include "tmsgtype.h"
25

26
#define RAW_NULL_CHECK(c) \
27
  do {                    \
28
    if (c == NULL) {      \
29
      code = terrno;      \
30
      goto end;           \
31
    }                     \
32
  } while (0)
33

34
#define RAW_FALSE_CHECK(c)           \
35
  do {                               \
36
    if (!c) {                        \
37
      code = TSDB_CODE_INVALID_PARA; \
38
      goto end;                      \
39
    }                                \
40
  } while (0)
41

42
#define RAW_RETURN_CHECK(c) \
43
  do {                      \
44
    code = c;               \
45
    if (code != 0) {        \
46
      goto end;             \
47
    }                       \
48
  } while (0)
49

50
#define LOG_ID_TAG   "connId:0x%" PRIx64 ", QID:0x%" PRIx64
51
#define LOG_ID_VALUE *(int64_t*)taos, pRequest->requestId
52

53
#define TMQ_META_VERSION "1.0"
54

55
static bool  tmqAddJsonObjectItem(cJSON *object, const char *string, cJSON *item){
6,978✔
56
  bool ret = cJSON_AddItemToObject(object, string, item);
6,978✔
57
  if (!ret){
6,978!
58
    cJSON_Delete(item);
×
59
  }
60
  return ret;
6,978✔
61
}
62
static bool  tmqAddJsonArrayItem(cJSON *array, cJSON *item){
1,275✔
63
  bool ret = cJSON_AddItemToArray(array, item);
1,275✔
64
  if (!ret){
1,275!
65
    cJSON_Delete(item);
×
66
  }
67
  return ret;
1,275✔
68
}
69

70

71
static int32_t  tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen);
72
static tb_uid_t processSuid(tb_uid_t suid, char* db) {
144✔
73
  if (db == NULL) {
144!
74
    return suid;
×
75
  }
76
  return suid + MurmurHash3_32(db, strlen(db));
144✔
77
}
78
static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t,
109✔
79
                                 SColCmprWrapper* pColCmprRow, cJSON** pJson) {
80
  if (schemaRow == NULL || name == NULL || pColCmprRow == NULL || pJson == NULL) {
109!
81
    uError("invalid parameter, schemaRow:%p, name:%p, pColCmprRow:%p, pJson:%p", schemaRow, name, pColCmprRow, pJson);
×
82
    return;
×
83
  }
84
  int32_t code = TSDB_CODE_SUCCESS;
109✔
85
  int8_t  buildDefaultCompress = 0;
109✔
86
  if (pColCmprRow->nCols <= 0) {
109!
87
    buildDefaultCompress = 1;
×
88
  }
89

90
  char*  string = NULL;
109✔
91
  cJSON* json = cJSON_CreateObject();
109✔
92
  RAW_NULL_CHECK(json);
109!
93
  cJSON* type = cJSON_CreateString("create");
109✔
94
  RAW_NULL_CHECK(type);
109!
95

96
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
109!
97
  cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super");
109✔
98
  RAW_NULL_CHECK(tableType);
109!
99
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
109!
100
  cJSON* tableName = cJSON_CreateString(name);
109✔
101
  RAW_NULL_CHECK(tableName);
109!
102
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
109!
103

104
  cJSON* columns = cJSON_CreateArray();
109✔
105
  RAW_NULL_CHECK(columns);
109!
106
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "columns", columns));
109!
107

108
  for (int i = 0; i < schemaRow->nCols; i++) {
641✔
109
    cJSON* column = cJSON_CreateObject();
532✔
110
    RAW_NULL_CHECK(column);
532!
111
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(columns, column));
532!
112
    SSchema* s = schemaRow->pSchema + i;
532✔
113
    cJSON*   cname = cJSON_CreateString(s->name);
532✔
114
    RAW_NULL_CHECK(cname);
532!
115
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "name", cname));
532!
116
    cJSON* ctype = cJSON_CreateNumber(s->type);
532✔
117
    RAW_NULL_CHECK(ctype);
532!
118
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "type", ctype));
532!
119
    if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) {
598!
120
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
66✔
121
      cJSON*  cbytes = cJSON_CreateNumber(length);
66✔
122
      RAW_NULL_CHECK(cbytes);
66!
123
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "length", cbytes));
66!
124
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
466✔
125
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
9✔
126
      cJSON*  cbytes = cJSON_CreateNumber(length);
9✔
127
      RAW_NULL_CHECK(cbytes);
9!
128
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "length", cbytes));
9!
129
    }
130
    cJSON* isPk = cJSON_CreateBool(s->flags & COL_IS_KEY);
532✔
131
    RAW_NULL_CHECK(isPk);
532!
132
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "isPrimarykey", isPk));
532!
133

134
    if (pColCmprRow == NULL) {
532!
135
      continue;
×
136
    }
137

138
    uint32_t alg = 0;
532✔
139
    if (buildDefaultCompress) {
532!
140
      alg = createDefaultColCmprByType(s->type);
×
141
    } else {
142
      SColCmpr* pColCmpr = pColCmprRow->pColCmpr + i;
532✔
143
      alg = pColCmpr->alg;
532✔
144
    }
145
    const char* encode = columnEncodeStr(COMPRESS_L1_TYPE_U32(alg));
532✔
146
    RAW_NULL_CHECK(encode);
532!
147
    const char* compress = columnCompressStr(COMPRESS_L2_TYPE_U32(alg));
532✔
148
    RAW_NULL_CHECK(compress);
532!
149
    const char* level = columnLevelStr(COMPRESS_L2_TYPE_LEVEL_U32(alg));
532✔
150
    RAW_NULL_CHECK(level);
532!
151

152
    cJSON* encodeJson = cJSON_CreateString(encode);
532✔
153
    RAW_NULL_CHECK(encodeJson);
532!
154
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "encode", encodeJson));
532!
155

156
    cJSON* compressJson = cJSON_CreateString(compress);
532✔
157
    RAW_NULL_CHECK(compressJson);
532!
158
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "compress", compressJson));
532!
159

160
    cJSON* levelJson = cJSON_CreateString(level);
532✔
161
    RAW_NULL_CHECK(levelJson);
532!
162
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "level", levelJson));
532!
163
  }
164

165
  cJSON* tags = cJSON_CreateArray();
109✔
166
  RAW_NULL_CHECK(tags);
109!
167
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags));
109!
168

169
  for (int i = 0; schemaTag && i < schemaTag->nCols; i++) {
330✔
170
    cJSON* tag = cJSON_CreateObject();
221✔
171
    RAW_NULL_CHECK(tag);
221!
172
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag));
221!
173
    SSchema* s = schemaTag->pSchema + i;
221✔
174
    cJSON*   tname = cJSON_CreateString(s->name);
221✔
175
    RAW_NULL_CHECK(tname);
221!
176
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname));
221!
177
    cJSON* ttype = cJSON_CreateNumber(s->type);
221✔
178
    RAW_NULL_CHECK(ttype);
221!
179
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype));
221!
180
    if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) {
229!
181
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
8✔
182
      cJSON*  cbytes = cJSON_CreateNumber(length);
8✔
183
      RAW_NULL_CHECK(cbytes);
8!
184
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "length", cbytes));
8!
185
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
213✔
186
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
63✔
187
      cJSON*  cbytes = cJSON_CreateNumber(length);
63✔
188
      RAW_NULL_CHECK(cbytes);
63!
189
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "length", cbytes));
63!
190
    }
191
  }
192

193
end:
109✔
194
  *pJson = json;
109✔
195
}
196

197
static int32_t setCompressOption(cJSON* json, uint32_t para) {
×
198
  if (json == NULL) {
×
199
    return TSDB_CODE_INVALID_PARA;
×
200
  }
201
  uint8_t encode = COMPRESS_L1_TYPE_U32(para);
×
202
  int32_t code = 0;
×
203
  if (encode != 0) {
×
204
    const char* encodeStr = columnEncodeStr(encode);
×
205
    RAW_NULL_CHECK(encodeStr);
×
206
    cJSON* encodeJson = cJSON_CreateString(encodeStr);
×
207
    RAW_NULL_CHECK(encodeJson);
×
208
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "encode", encodeJson));
×
209
    return code;
×
210
  }
211
  uint8_t compress = COMPRESS_L2_TYPE_U32(para);
×
212
  if (compress != 0) {
×
213
    const char* compressStr = columnCompressStr(compress);
×
214
    RAW_NULL_CHECK(compressStr);
×
215
    cJSON* compressJson = cJSON_CreateString(compressStr);
×
216
    RAW_NULL_CHECK(compressJson);
×
217
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "compress", compressJson));
×
218
    return code;
×
219
  }
220
  uint8_t level = COMPRESS_L2_TYPE_LEVEL_U32(para);
×
221
  if (level != 0) {
×
222
    const char* levelStr = columnLevelStr(level);
×
223
    RAW_NULL_CHECK(levelStr);
×
224
    cJSON* levelJson = cJSON_CreateString(levelStr);
×
225
    RAW_NULL_CHECK(levelJson);
×
226
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "level", levelJson));
×
227
    return code;
×
228
  }
229

230
end:
×
231
  return code;
×
232
}
233
static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON** pJson) {
55✔
234
  if (alterData == NULL || pJson == NULL) {
55!
235
    uError("invalid parameter in %s", __func__);
×
236
    return;
×
237
  }
238
  SMAlterStbReq req = {0};
55✔
239
  cJSON*        json = NULL;
55✔
240
  char*         string = NULL;
55✔
241
  int32_t       code = 0;
55✔
242

243
  if (tDeserializeSMAlterStbReq(alterData, alterDataLen, &req) != 0) {
55!
244
    goto end;
×
245
  }
246

247
  json = cJSON_CreateObject();
55✔
248
  RAW_NULL_CHECK(json);
55!
249
  cJSON* type = cJSON_CreateString("alter");
55✔
250
  RAW_NULL_CHECK(type);
55!
251
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
55!
252
  SName name = {0};
55✔
253
  RAW_RETURN_CHECK(tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
55!
254
  cJSON* tableType = cJSON_CreateString("super");
55✔
255
  RAW_NULL_CHECK(tableType);
55!
256
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
55!
257
  cJSON* tableName = cJSON_CreateString(name.tname);
55✔
258
  RAW_NULL_CHECK(tableName);
55!
259
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
55!
260

261
  cJSON* alterType = cJSON_CreateNumber(req.alterType);
55✔
262
  RAW_NULL_CHECK(alterType);
55!
263
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "alterType", alterType));
55!
264
  switch (req.alterType) {
55!
265
    case TSDB_ALTER_TABLE_ADD_TAG:
33✔
266
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
267
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
33✔
268
      RAW_NULL_CHECK(field);
33!
269
      cJSON* colName = cJSON_CreateString(field->name);
33✔
270
      RAW_NULL_CHECK(colName);
33!
271
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
33!
272
      cJSON* colType = cJSON_CreateNumber(field->type);
33✔
273
      RAW_NULL_CHECK(colType);
33!
274
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
33!
275

276
      if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
33!
277
          field->type == TSDB_DATA_TYPE_GEOMETRY) {
33!
278
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
11✔
279
        cJSON*  cbytes = cJSON_CreateNumber(length);
11✔
280
        RAW_NULL_CHECK(cbytes);
11!
281
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
11!
282
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
22!
283
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
284
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
285
        RAW_NULL_CHECK(cbytes);
×
286
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
287
      }
288
      break;
33✔
289
    }
290
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: {
×
291
      SFieldWithOptions* field = taosArrayGet(req.pFields, 0);
×
292
      RAW_NULL_CHECK(field);
×
293
      cJSON* colName = cJSON_CreateString(field->name);
×
294
      RAW_NULL_CHECK(colName);
×
295
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
296
      cJSON* colType = cJSON_CreateNumber(field->type);
×
297
      RAW_NULL_CHECK(colType);
×
298
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
×
299

300
      if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
×
301
          field->type == TSDB_DATA_TYPE_GEOMETRY) {
×
302
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
×
303
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
304
        RAW_NULL_CHECK(cbytes);
×
305
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
306
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
×
307
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
308
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
309
        RAW_NULL_CHECK(cbytes);
×
310
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
311
      }
312
      RAW_RETURN_CHECK(setCompressOption(json, field->compress));
×
313
      break;
×
314
    }
315
    case TSDB_ALTER_TABLE_DROP_TAG:
11✔
316
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
317
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
11✔
318
      RAW_NULL_CHECK(field);
11!
319
      cJSON* colName = cJSON_CreateString(field->name);
11✔
320
      RAW_NULL_CHECK(colName);
11!
321
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
11!
322
      break;
11✔
323
    }
324
    case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
11✔
325
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
326
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
11✔
327
      RAW_NULL_CHECK(field);
11!
328
      cJSON* colName = cJSON_CreateString(field->name);
11✔
329
      RAW_NULL_CHECK(colName);
11!
330
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
11!
331
      cJSON* colType = cJSON_CreateNumber(field->type);
11✔
332
      RAW_NULL_CHECK(colType);
11!
333
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
11!
334
      if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
11!
335
          field->type == TSDB_DATA_TYPE_GEOMETRY) {
11!
336
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
11✔
337
        cJSON*  cbytes = cJSON_CreateNumber(length);
11✔
338
        RAW_NULL_CHECK(cbytes);
11!
339
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
11!
340
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
×
341
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
342
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
343
        RAW_NULL_CHECK(cbytes);
×
344
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
345
      }
346
      break;
11✔
347
    }
348
    case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
×
349
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
350
      TAOS_FIELD* oldField = taosArrayGet(req.pFields, 0);
×
351
      RAW_NULL_CHECK(oldField);
×
352
      TAOS_FIELD* newField = taosArrayGet(req.pFields, 1);
×
353
      RAW_NULL_CHECK(newField);
×
354
      cJSON* colName = cJSON_CreateString(oldField->name);
×
355
      RAW_NULL_CHECK(colName);
×
356
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
357
      cJSON* colNewName = cJSON_CreateString(newField->name);
×
358
      RAW_NULL_CHECK(colNewName);
×
359
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colNewName", colNewName));
×
360
      break;
×
361
    }
362
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
×
363
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
×
364
      RAW_NULL_CHECK(field);
×
365
      cJSON* colName = cJSON_CreateString(field->name);
×
366
      RAW_NULL_CHECK(colName);
×
367
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
368
      RAW_RETURN_CHECK(setCompressOption(json, field->bytes));
×
369
      break;
×
370
    }
371
    default:
×
372
      break;
×
373
  }
374

375
end:
55✔
376
  tFreeSMAltertbReq(&req);
55✔
377
  *pJson = json;
55✔
378
}
379

380
static void processCreateStb(SMqMetaRsp* metaRsp, cJSON** pJson) {
89✔
381
  if (metaRsp == NULL || pJson == NULL) {
89!
382
    uError("invalid parameter in %s", __func__);
×
383
    return;
×
384
  }
385
  SVCreateStbReq req = {0};
89✔
386
  SDecoder       coder = {0};
89✔
387

388
  uDebug("create stable data:%p", metaRsp);
89!
389
  // decode and process req
390
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
89✔
391
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
89✔
392
  tDecoderInit(&coder, data, len);
89✔
393

394
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
89!
395
    goto end;
×
396
  }
397
  buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE, &req.colCmpr, pJson);
89✔
398

399
end:
89✔
400
  uDebug("create stable return");
89!
401
  tDecoderClear(&coder);
89✔
402
}
403

404
static void processAlterStb(SMqMetaRsp* metaRsp, cJSON** pJson) {
55✔
405
  if (metaRsp == NULL || pJson == NULL) {
55!
406
    uError("invalid parameter in %s", __func__);
×
407
    return;
×
408
  }
409
  SVCreateStbReq req = {0};
55✔
410
  SDecoder       coder = {0};
55✔
411
  uDebug("alter stable data:%p", metaRsp);
55!
412

413
  // decode and process req
414
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
55✔
415
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
55✔
416
  tDecoderInit(&coder, data, len);
55✔
417

418
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
55!
419
    goto end;
×
420
  }
421
  buildAlterSTableJson(req.alterOriData, req.alterOriDataLen, pJson);
55✔
422

423
end:
55✔
424
  uDebug("alter stable return");
55!
425
  tDecoderClear(&coder);
55✔
426
}
427

428
static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
160✔
429
  if (json == NULL || pCreateReq == NULL) {
160!
430
    uError("invalid parameter in %s", __func__);
×
431
    return;
×
432
  }
433
  STag*   pTag = (STag*)pCreateReq->ctb.pTag;
160✔
434
  char*   sname = pCreateReq->ctb.stbName;
160✔
435
  char*   name = pCreateReq->name;
160✔
436
  SArray* tagName = pCreateReq->ctb.tagName;
160✔
437
  int64_t id = pCreateReq->uid;
160✔
438
  uint8_t tagNum = pCreateReq->ctb.tagNum;
160✔
439
  int32_t code = 0;
160✔
440
  SArray* pTagVals = NULL;
160✔
441
  char*   pJson = NULL;
160✔
442

443
  cJSON*  tableName = cJSON_CreateString(name);
160✔
444
  RAW_NULL_CHECK(tableName);
160!
445
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
160!
446
  cJSON* using = cJSON_CreateString(sname);
160✔
447
  RAW_NULL_CHECK(using);
160!
448
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "using", using));
160!
449
  cJSON* tagNumJson = cJSON_CreateNumber(tagNum);
160✔
450
  RAW_NULL_CHECK(tagNumJson);
160!
451
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tagNum", tagNumJson));
160!
452

453
  cJSON* tags = cJSON_CreateArray();
160✔
454
  RAW_NULL_CHECK(tags);
160!
455
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags));
160!
456
  RAW_RETURN_CHECK(tTagToValArray(pTag, &pTagVals));
160!
457
  if (tTagIsJson(pTag)) {
160✔
458
    STag* p = (STag*)pTag;
22✔
459
    if (p->nTag == 0) {
22✔
460
      uError("p->nTag == 0");
11!
461
      goto end;
11✔
462
    }
463
    parseTagDatatoJson(pTag, &pJson, NULL);
11✔
464
    RAW_NULL_CHECK(pJson);
11!
465
    cJSON* tag = cJSON_CreateObject();
11✔
466
    RAW_NULL_CHECK(tag);
11!
467
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag));
11!
468
    STagVal* pTagVal = taosArrayGet(pTagVals, 0);
11✔
469
    RAW_NULL_CHECK(pTagVal);
11!
470
    char* ptname = taosArrayGet(tagName, 0);
11✔
471
    RAW_NULL_CHECK(ptname);
11!
472
    cJSON* tname = cJSON_CreateString(ptname);
11✔
473
    RAW_NULL_CHECK(tname);
11!
474
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname));
11!
475
    cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON);
11✔
476
    RAW_NULL_CHECK(ttype);
11!
477
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype));
11!
478
    cJSON* tvalue = cJSON_CreateString(pJson);
11✔
479
    RAW_NULL_CHECK(tvalue);
11!
480
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "value", tvalue));
11!
481
    goto end;
11✔
482
  }
483

484
  for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
481✔
485
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
343✔
486
    RAW_NULL_CHECK(pTagVal);
343!
487
    cJSON* tag = cJSON_CreateObject();
343✔
488
    RAW_NULL_CHECK(tag);
343!
489
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag));
343!
490
    char* ptname = taosArrayGet(tagName, i);
343✔
491
    RAW_NULL_CHECK(ptname);
343!
492
    cJSON* tname = cJSON_CreateString(ptname);
343✔
493
    RAW_NULL_CHECK(tname);
343!
494
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname));
343!
495
    cJSON* ttype = cJSON_CreateNumber(pTagVal->type);
343✔
496
    RAW_NULL_CHECK(ttype);
343!
497
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype));
343!
498

499
    cJSON* tvalue = NULL;
343✔
500
    if (IS_VAR_DATA_TYPE(pTagVal->type)) {
451!
501
      int64_t bufSize = 0;
108✔
502
      if (pTagVal->type == TSDB_DATA_TYPE_VARBINARY) {
108!
503
        bufSize = pTagVal->nData * 2 + 2 + 3;
×
504
      } else {
505
        bufSize = pTagVal->nData + 3;
108✔
506
      }
507
      char* buf = taosMemoryCalloc(bufSize, 1);
108!
508
      RAW_NULL_CHECK(buf);
108!
509
      if (dataConverToStr(buf, bufSize, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL) != TSDB_CODE_SUCCESS) {
108!
510
        taosMemoryFree(buf);
×
511
        goto end;
×
512
      }
513

514
      tvalue = cJSON_CreateString(buf);
108✔
515
      taosMemoryFree(buf);
108!
516
      RAW_NULL_CHECK(tvalue);
108!
517
    } else {
518
      double val = 0;
235✔
519
      GET_TYPED_DATA(val, double, pTagVal->type, &pTagVal->i64, 0); // currently tag type can't be decimal, so pass 0 as typeMod
235!
520
      tvalue = cJSON_CreateNumber(val);
235✔
521
      RAW_NULL_CHECK(tvalue);
235!
522
    }
523

524
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "value", tvalue));
343!
525
  }
526

527
end:
138✔
528
  taosMemoryFree(pJson);
160!
529
  taosArrayDestroy(pTagVals);
160✔
530
}
531

532
static void buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs, cJSON** pJson) {
126✔
533
  if (pJson == NULL || pCreateReq == NULL) {
126!
534
    uError("invalid parameter in %s", __func__);
×
535
    return;
×
536
  }
537
  int32_t code = 0;
126✔
538
  char*   string = NULL;
126✔
539
  cJSON*  json = cJSON_CreateObject();
126✔
540
  RAW_NULL_CHECK(json);
126!
541
  cJSON* type = cJSON_CreateString("create");
126✔
542
  RAW_NULL_CHECK(type);
126!
543
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
126!
544

545
  cJSON* tableType = cJSON_CreateString("child");
126✔
546
  RAW_NULL_CHECK(tableType);
126!
547
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
126!
548

549
  buildChildElement(json, pCreateReq);
126✔
550
  cJSON* createList = cJSON_CreateArray();
126✔
551
  RAW_NULL_CHECK(createList);
126!
552
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "createList", createList));
126!
553

554
  for (int i = 0; nReqs > 1 && i < nReqs; i++) {
160✔
555
    cJSON* create = cJSON_CreateObject();
34✔
556
    RAW_NULL_CHECK(create);
34!
557
    buildChildElement(create, pCreateReq + i);
34✔
558
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(createList, create));
34!
559
  }
560

561
end:
126✔
562
  *pJson = json;
126✔
563
}
564

565
static void processCreateTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
136✔
566
  if (pJson == NULL || metaRsp == NULL) {
136!
567
    uError("invalid parameter in %s", __func__);
×
568
    return;
×
569
  }
570
  SDecoder           decoder = {0};
136✔
571
  SVCreateTbBatchReq req = {0};
136✔
572
  SVCreateTbReq*     pCreateReq;
573
  // decode
574
  uDebug("create table data:%p", metaRsp);
136!
575
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
136✔
576
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
136✔
577
  tDecoderInit(&decoder, data, len);
136✔
578
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
136!
579
    goto end;
×
580
  }
581

582
  // loop to create table
583
  if (req.nReqs > 0) {
136!
584
    pCreateReq = req.pReqs;
136✔
585
    if (pCreateReq->type == TSDB_CHILD_TABLE) {
136✔
586
      buildCreateCTableJson(req.pReqs, req.nReqs, pJson);
116✔
587
    } else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
20!
588
      buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE,
20✔
589
                           &pCreateReq->colCmpr, pJson);
590
    }
591
  }
592

593
end:
×
594
  uDebug("create table return");
136!
595
  tDeleteSVCreateTbBatchReq(&req);
136✔
596
  tDecoderClear(&decoder);
136✔
597
}
598

599
static void processAutoCreateTable(SMqDataRsp* rsp, char** string) {
10✔
600
  if (rsp == NULL || string == NULL) {
10!
601
    uError("invalid parameter in %s", __func__);
×
602
    return;
×
603
  }
604
  SDecoder*      decoder = NULL;
10✔
605
  SVCreateTbReq* pCreateReq = NULL;
10✔
606
  int32_t        code = 0;
10✔
607
  uDebug("auto create table data:%p", rsp);
10!
608
  if (rsp->createTableNum <= 0) {
10!
609
    uError("processAutoCreateTable rsp->createTableNum <= 0");
×
610
    goto end;
×
611
  }
612

613
  decoder = taosMemoryCalloc(rsp->createTableNum, sizeof(SDecoder));
10!
614
  RAW_NULL_CHECK(decoder);
10!
615
  pCreateReq = taosMemoryCalloc(rsp->createTableNum, sizeof(SVCreateTbReq));
10!
616
  RAW_NULL_CHECK(pCreateReq);
10!
617

618
  // loop to create table
619
  for (int32_t iReq = 0; iReq < rsp->createTableNum; iReq++) {
29✔
620
    // decode
621
    void** data = taosArrayGet(rsp->createTableReq, iReq);
19✔
622
    RAW_NULL_CHECK(data);
19!
623
    int32_t* len = taosArrayGet(rsp->createTableLen, iReq);
19✔
624
    RAW_NULL_CHECK(len);
19!
625
    tDecoderInit(&decoder[iReq], *data, *len);
19✔
626
    if (tDecodeSVCreateTbReq(&decoder[iReq], pCreateReq + iReq) < 0) {
19!
627
      goto end;
×
628
    }
629

630
    if (pCreateReq[iReq].type != TSDB_CHILD_TABLE) {
19!
631
      uError("processAutoCreateTable pCreateReq[iReq].type != TSDB_CHILD_TABLE");
×
632
      goto end;
×
633
    }
634
  }
635
  cJSON* pJson = NULL;
10✔
636
  buildCreateCTableJson(pCreateReq, rsp->createTableNum, &pJson);
10✔
637
  *string = cJSON_PrintUnformatted(pJson);
10✔
638
  cJSON_Delete(pJson);
10✔
639

640
end:
10✔
641
  uDebug("auto created table return, sql json:%s", *string);
10!
642
  for (int i = 0; decoder && pCreateReq && i < rsp->createTableNum; i++) {
29!
643
    tDecoderClear(&decoder[i]);
19✔
644
    taosMemoryFreeClear(pCreateReq[i].comment);
19!
645
    if (pCreateReq[i].type == TSDB_CHILD_TABLE) {
19!
646
      taosArrayDestroy(pCreateReq[i].ctb.tagName);
19✔
647
    }
648
  }
649
  taosMemoryFree(decoder);
10!
650
  taosMemoryFree(pCreateReq);
10!
651
}
652

653
static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
30✔
654
  if (pJson == NULL || metaRsp == NULL) {
30!
655
    uError("invalid parameter in %s", __func__);
×
656
    return;
×
657
  }
658
  SDecoder     decoder = {0};
30✔
659
  SVAlterTbReq vAlterTbReq = {0};
30✔
660
  char*        string = NULL;
30✔
661
  cJSON*       json = NULL;
30✔
662
  int32_t      code = 0;
30✔
663

664
  uDebug("alter table data:%p", metaRsp);
30!
665
  // decode
666
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
30✔
667
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
30✔
668
  tDecoderInit(&decoder, data, len);
30✔
669
  if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) {
30!
670
    uError("tDecodeSVAlterTbReq error");
×
671
    goto end;
×
672
  }
673

674
  json = cJSON_CreateObject();
30✔
675
  RAW_NULL_CHECK(json);
30!
676
  cJSON* type = cJSON_CreateString("alter");
30✔
677
  RAW_NULL_CHECK(type);
30!
678
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
30!
679
  cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ||
55✔
680
                                                vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL
25!
681
                                            ? "child"
682
                                            : "normal");
683
  RAW_NULL_CHECK(tableType);
30!
684
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
30!
685
  cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
30✔
686
  RAW_NULL_CHECK(tableName);
30!
687
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
30!
688
  cJSON* alterType = cJSON_CreateNumber(vAlterTbReq.action);
30✔
689
  RAW_NULL_CHECK(alterType);
30!
690
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "alterType", alterType));
30!
691

692
  switch (vAlterTbReq.action) {
30!
693
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
5✔
694
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
5✔
695
      RAW_NULL_CHECK(colName);
5!
696
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
5!
697
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
5✔
698
      RAW_NULL_CHECK(colType);
5!
699
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
5!
700

701
      if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY ||
5!
702
          vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) {
5!
703
        int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
×
704
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
705
        RAW_NULL_CHECK(cbytes);
×
706
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
707
      } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
5!
708
        int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
709
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
710
        RAW_NULL_CHECK(cbytes);
×
711
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
712
      }
713
      break;
5✔
714
    }
715
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: {
×
716
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
×
717
      RAW_NULL_CHECK(colName);
×
718
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
719
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
×
720
      RAW_NULL_CHECK(colType);
×
721
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
×
722

723
      if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY ||
×
724
          vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) {
×
725
        int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
×
726
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
727
        RAW_NULL_CHECK(cbytes);
×
728
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
729
      } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
×
730
        int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
731
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
732
        RAW_NULL_CHECK(cbytes);
×
733
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
734
      }
735
      RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
×
736
      break;
×
737
    }
738
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
5✔
739
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
5✔
740
      RAW_NULL_CHECK(colName);
5!
741
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
5!
742
      break;
5✔
743
    }
744
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
5✔
745
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
5✔
746
      RAW_NULL_CHECK(colName);
5!
747
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
5!
748
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.colModType);
5✔
749
      RAW_NULL_CHECK(colType);
5!
750
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
5!
751
      if (vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY || vAlterTbReq.colModType == TSDB_DATA_TYPE_VARBINARY ||
5!
752
          vAlterTbReq.colModType == TSDB_DATA_TYPE_GEOMETRY) {
5!
753
        int32_t length = vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE;
×
754
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
755
        RAW_NULL_CHECK(cbytes);
×
756
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
757
      } else if (vAlterTbReq.colModType == TSDB_DATA_TYPE_NCHAR) {
5!
758
        int32_t length = (vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
5✔
759
        cJSON*  cbytes = cJSON_CreateNumber(length);
5✔
760
        RAW_NULL_CHECK(cbytes);
5!
761
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
5!
762
      }
763
      break;
5✔
764
    }
765
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
5✔
766
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
5✔
767
      RAW_NULL_CHECK(colName);
5!
768
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
5!
769
      cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName);
5✔
770
      RAW_NULL_CHECK(colNewName);
5!
771
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colNewName", colNewName));
5!
772
      break;
5✔
773
    }
774
    case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: {
5✔
775
      cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName);
5✔
776
      RAW_NULL_CHECK(tagName);
5!
777
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", tagName));
5!
778

779
      bool isNull = vAlterTbReq.isNull;
5✔
780
      if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
5!
781
        STag* jsonTag = (STag*)vAlterTbReq.pTagVal;
×
782
        if (jsonTag->nTag == 0) isNull = true;
×
783
      }
784
      if (!isNull) {
5!
785
        char* buf = NULL;
5✔
786

787
        if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
5!
788
          if (!tTagIsJson(vAlterTbReq.pTagVal)) {
×
789
            uError("processAlterTable isJson false");
×
790
            goto end;
×
791
          }
792
          parseTagDatatoJson(vAlterTbReq.pTagVal, &buf, NULL);
×
793
          if (buf == NULL) {
×
794
            uError("parseTagDatatoJson failed, buf == NULL");
×
795
            goto end;
×
796
          }
797
        } else {
798
          int64_t bufSize = 0;
5✔
799
          if (vAlterTbReq.tagType == TSDB_DATA_TYPE_VARBINARY) {
5!
800
            bufSize = vAlterTbReq.nTagVal * 2 + 2 + 3;
×
801
          } else {
802
            bufSize = vAlterTbReq.nTagVal + 32;
5✔
803
          }
804
          buf = taosMemoryCalloc(bufSize, 1);
5!
805
          RAW_NULL_CHECK(buf);
5!
806
          if (dataConverToStr(buf, bufSize, vAlterTbReq.tagType, vAlterTbReq.pTagVal, vAlterTbReq.nTagVal, NULL) !=
5!
807
              TSDB_CODE_SUCCESS) {
808
            taosMemoryFree(buf);
×
809
            goto end;
×
810
          }
811
        }
812

813
        cJSON* colValue = cJSON_CreateString(buf);
5✔
814
        taosMemoryFree(buf);
5!
815
        RAW_NULL_CHECK(colValue);
5!
816
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colValue", colValue));
5!
817
      }
818

819
      cJSON* isNullCJson = cJSON_CreateBool(isNull);
5✔
820
      RAW_NULL_CHECK(isNullCJson);
5!
821
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colValueNull", isNullCJson));
5!
822
      break;
5✔
823
    }
824
    case TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL: {
×
825
      int32_t nTags = taosArrayGetSize(vAlterTbReq.pMultiTag);
×
826
      if (nTags <= 0) {
×
827
        uError("processAlterTable parse multi tags error");
×
828
        goto end;
×
829
      }
830

831
      cJSON* tags = cJSON_CreateArray();
×
832
      RAW_NULL_CHECK(tags);
×
833
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags));
×
834

835
      for (int32_t i = 0; i < nTags; i++) {
×
836
        cJSON* member = cJSON_CreateObject();
×
837
        RAW_NULL_CHECK(member);
×
838
        RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, member));
×
839

840
        SMultiTagUpateVal* pTagVal = taosArrayGet(vAlterTbReq.pMultiTag, i);
×
841
        cJSON*             tagName = cJSON_CreateString(pTagVal->tagName);
×
842
        RAW_NULL_CHECK(tagName);
×
843
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colName", tagName));
×
844

845
        if (pTagVal->tagType == TSDB_DATA_TYPE_JSON) {
×
846
          uError("processAlterTable isJson false");
×
847
          goto end;
×
848
        }
849
        bool isNull = pTagVal->isNull;
×
850
        if (!isNull) {
×
851
          int64_t bufSize = 0;
×
852
          if (pTagVal->tagType == TSDB_DATA_TYPE_VARBINARY) {
×
853
            bufSize = pTagVal->nTagVal * 2 + 2 + 3;
×
854
          } else {
855
            bufSize = pTagVal->nTagVal + 3;
×
856
          }
857
          char* buf = taosMemoryCalloc(bufSize, 1);
×
858
          RAW_NULL_CHECK(buf);
×
859
          if (dataConverToStr(buf, bufSize, pTagVal->tagType, pTagVal->pTagVal, pTagVal->nTagVal, NULL) !=
×
860
              TSDB_CODE_SUCCESS) {
861
            taosMemoryFree(buf);
×
862
            goto end;
×
863
          }
864
          cJSON* colValue = cJSON_CreateString(buf);
×
865
          taosMemoryFree(buf);
×
866
          RAW_NULL_CHECK(colValue);
×
867
          RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colValue", colValue));
×
868
        }
869
        cJSON* isNullCJson = cJSON_CreateBool(isNull);
×
870
        RAW_NULL_CHECK(isNullCJson);
×
871
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colValueNull", isNullCJson));
×
872
      }
873
      break;
×
874
    }
875

876
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
×
877
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
×
878
      RAW_NULL_CHECK(colName);
×
879
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
880
      RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
×
881
      break;
×
882
    }
883
    default:
5✔
884
      break;
5✔
885
  }
886

887
end:
30✔
888
  uDebug("alter table return");
30!
889
  if (vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL) {
30!
890
    taosArrayDestroy(vAlterTbReq.pMultiTag);
×
891
  }
892
  tDecoderClear(&decoder);
30✔
893
  *pJson = json;
30✔
894
}
895

896
static void processDropSTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
8✔
897
  if (pJson == NULL || metaRsp == NULL) {
8!
898
    uError("invalid parameter in %s", __func__);
×
899
    return;
×
900
  }
901
  SDecoder     decoder = {0};
8✔
902
  SVDropStbReq req = {0};
8✔
903
  cJSON*       json = NULL;
8✔
904
  int32_t      code = 0;
8✔
905

906
  uDebug("processDropSTable data:%p", metaRsp);
8!
907

908
  // decode
909
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
8✔
910
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
8✔
911
  tDecoderInit(&decoder, data, len);
8✔
912
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
8!
913
    uError("tDecodeSVDropStbReq failed");
×
914
    goto end;
×
915
  }
916

917
  json = cJSON_CreateObject();
8✔
918
  RAW_NULL_CHECK(json);
8!
919
  cJSON* type = cJSON_CreateString("drop");
8✔
920
  RAW_NULL_CHECK(type);
8!
921
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
8!
922
  cJSON* tableType = cJSON_CreateString("super");
8✔
923
  RAW_NULL_CHECK(tableType);
8!
924
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
8!
925
  cJSON* tableName = cJSON_CreateString(req.name);
8✔
926
  RAW_NULL_CHECK(tableName);
8!
927
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
8!
928

929
end:
8✔
930
  uDebug("processDropSTable return");
8!
931
  tDecoderClear(&decoder);
8✔
932
  *pJson = json;
8✔
933
}
934
static void processDeleteTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
4✔
935
  if (pJson == NULL || metaRsp == NULL) {
4!
936
    uError("invalid parameter in %s", __func__);
×
937
    return;
×
938
  }
939
  SDeleteRes req = {0};
4✔
940
  SDecoder   coder = {0};
4✔
941
  cJSON*     json = NULL;
4✔
942
  int32_t    code = 0;
4✔
943

944
  uDebug("processDeleteTable data:%p", metaRsp);
4!
945
  // decode and process req
946
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
4✔
947
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
4✔
948

949
  tDecoderInit(&coder, data, len);
4✔
950
  if (tDecodeDeleteRes(&coder, &req) < 0) {
4!
951
    uError("tDecodeDeleteRes failed");
×
952
    goto end;
×
953
  }
954

955
  //  getTbName(req.tableFName);
956
  char sql[256] = {0};
4✔
957
  (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
4✔
958
                 req.tsColName, req.skey, req.tsColName, req.ekey);
959

960
  json = cJSON_CreateObject();
4✔
961
  RAW_NULL_CHECK(json);
4!
962
  cJSON* type = cJSON_CreateString("delete");
4✔
963
  RAW_NULL_CHECK(type);
4!
964
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
4!
965
  cJSON* sqlJson = cJSON_CreateString(sql);
4✔
966
  RAW_NULL_CHECK(sqlJson);
4!
967
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "sql", sqlJson));
4!
968

969
end:
4✔
970
  uDebug("processDeleteTable return");
4!
971
  tDecoderClear(&coder);
4✔
972
  *pJson = json;
4✔
973
}
974

975
static void processDropTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
5✔
976
  if (pJson == NULL || metaRsp == NULL) {
5!
977
    uError("invalid parameter in %s", __func__);
×
978
    return;
×
979
  }
980
  SDecoder         decoder = {0};
5✔
981
  SVDropTbBatchReq req = {0};
5✔
982
  cJSON*           json = NULL;
5✔
983
  int32_t          code = 0;
5✔
984

985
  uDebug("processDropTable data:%p", metaRsp);
5!
986
  // decode
987
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
5✔
988
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
5✔
989
  tDecoderInit(&decoder, data, len);
5✔
990
  if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) {
5!
991
    uError("tDecodeSVDropTbBatchReq failed");
×
992
    goto end;
×
993
  }
994

995
  json = cJSON_CreateObject();
5✔
996
  RAW_NULL_CHECK(json);
5!
997
  cJSON* type = cJSON_CreateString("drop");
5✔
998
  RAW_NULL_CHECK(type);
5!
999
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
5!
1000
  cJSON* tableNameList = cJSON_CreateArray();
5✔
1001
  RAW_NULL_CHECK(tableNameList);
5!
1002
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableNameList", tableNameList));
5!
1003

1004
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
11✔
1005
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
6✔
1006
    cJSON*       tableName = cJSON_CreateString(pDropTbReq->name);
6✔
1007
    RAW_NULL_CHECK(tableName);
6!
1008
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tableNameList, tableName));
6!
1009
  }
1010

1011
end:
5✔
1012
  uDebug("processDropTable return");
5!
1013
  tDecoderClear(&decoder);
5✔
1014
  *pJson = json;
5✔
1015
}
1016

1017
static int32_t taosCreateStb(TAOS* taos, void* meta, uint32_t metaLen) {
144✔
1018
  if (taos == NULL || meta == NULL) {
144!
1019
    uError("invalid parameter in %s", __func__);
×
1020
    return TSDB_CODE_INVALID_PARA;
×
1021
  }
1022
  SVCreateStbReq req = {0};
144✔
1023
  SDecoder       coder = {0};
144✔
1024
  SMCreateStbReq pReq = {0};
144✔
1025
  int32_t        code = TSDB_CODE_SUCCESS;
144✔
1026
  SRequestObj*   pRequest = NULL;
144✔
1027

1028
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
144!
1029
  uDebug(LOG_ID_TAG " create stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
144!
1030
  pRequest->syncQuery = true;
144✔
1031
  if (!pRequest->pDb) {
144!
1032
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1033
    goto end;
×
1034
  }
1035
  // decode and process req
1036
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
144✔
1037
  uint32_t len = metaLen - sizeof(SMsgHead);
144✔
1038
  tDecoderInit(&coder, data, len);
144✔
1039
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
144!
1040
    code = TSDB_CODE_INVALID_PARA;
×
1041
    goto end;
×
1042
  }
1043

1044
  int8_t           createDefaultCompress = 0;
144✔
1045
  SColCmprWrapper* p = &req.colCmpr;
144✔
1046
  if (p->nCols == 0) {
144!
1047
    createDefaultCompress = 1;
×
1048
  }
1049
  // build create stable
1050
  pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SFieldWithOptions));
144✔
1051
  RAW_NULL_CHECK(pReq.pColumns);
144!
1052
  for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
880✔
1053
    SSchema*          pSchema = req.schemaRow.pSchema + i;
736✔
1054
    SFieldWithOptions field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
736✔
1055
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
736✔
1056

1057
    if (createDefaultCompress) {
736!
1058
      field.compress = createDefaultColCmprByType(pSchema->type);
×
1059
    } else {
1060
      SColCmpr* pCmp = &req.colCmpr.pColCmpr[i];
736✔
1061
      field.compress = pCmp->alg;
736✔
1062
    }
1063
    if (req.pExtSchemas) field.typeMod = req.pExtSchemas[i].typeMod;
736✔
1064
    RAW_NULL_CHECK(taosArrayPush(pReq.pColumns, &field));
1,472!
1065
  }
1066
  pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
144✔
1067
  RAW_NULL_CHECK(pReq.pTags);
144!
1068
  for (int32_t i = 0; i < req.schemaTag.nCols; i++) {
541✔
1069
    SSchema* pSchema = req.schemaTag.pSchema + i;
397✔
1070
    SField   field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
397✔
1071
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
397✔
1072
    RAW_NULL_CHECK(taosArrayPush(pReq.pTags, &field));
794!
1073
  }
1074

1075
  pReq.colVer = req.schemaRow.version;
144✔
1076
  pReq.tagVer = req.schemaTag.version;
144✔
1077
  pReq.numOfColumns = req.schemaRow.nCols;
144✔
1078
  pReq.numOfTags = req.schemaTag.nCols;
144✔
1079
  pReq.commentLen = -1;
144✔
1080
  pReq.suid = processSuid(req.suid, pRequest->pDb);
144✔
1081
  pReq.source = TD_REQ_FROM_TAOX;
144✔
1082
  pReq.igExists = true;
144✔
1083

1084
  uDebug(LOG_ID_TAG " create stable name:%s suid:%" PRId64 " processSuid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
144!
1085
         pReq.suid);
1086
  STscObj* pTscObj = pRequest->pTscObj;
144✔
1087
  SName    tableName = {0};
144✔
1088
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
144✔
1089
  RAW_RETURN_CHECK(tNameExtractFullName(&tableName, pReq.name));
144!
1090
  SCmdMsgInfo pCmdMsg = {0};
144✔
1091
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
144✔
1092
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
144✔
1093
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
144✔
1094
  if (pCmdMsg.msgLen <= 0) {
144!
1095
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
1096
    goto end;
×
1097
  }
1098
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
144!
1099
  RAW_NULL_CHECK(pCmdMsg.pMsg);
144!
1100
  if (tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) <= 0) {
144!
1101
    code = TSDB_CODE_INVALID_PARA;
×
1102
    taosMemoryFree(pCmdMsg.pMsg);
×
UNCOV
1103
    goto end;
×
1104
  }
1105

1106
  SQuery pQuery = {0};
144✔
1107
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
144✔
1108
  pQuery.pCmdMsg = &pCmdMsg;
144✔
1109
  pQuery.msgType = pQuery.pCmdMsg->msgType;
144✔
1110
  pQuery.stableQuery = true;
144✔
1111

1112
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
144✔
1113

1114
  taosMemoryFree(pCmdMsg.pMsg);
144!
1115

1116
  if (pRequest->code == TSDB_CODE_SUCCESS) {
144!
1117
    SCatalog* pCatalog = NULL;
144✔
1118
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
144!
1119
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
144!
1120
  }
1121

1122
  code = pRequest->code;
144✔
1123

1124
end:
144✔
1125
  uDebug(LOG_ID_TAG " create stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
144!
1126
  destroyRequest(pRequest);
144✔
1127
  tFreeSMCreateStbReq(&pReq);
144✔
1128
  tDecoderClear(&coder);
144✔
1129
  return code;
144✔
1130
}
1131

1132
static int32_t taosDropStb(TAOS* taos, void* meta, uint32_t metaLen) {
8✔
1133
  if (taos == NULL || meta == NULL) {
8!
1134
    uError("invalid parameter in %s", __func__);
×
UNCOV
1135
    return TSDB_CODE_INVALID_PARA;
×
1136
  }
1137
  SVDropStbReq req = {0};
8✔
1138
  SDecoder     coder = {0};
8✔
1139
  SMDropStbReq pReq = {0};
8✔
1140
  int32_t      code = TSDB_CODE_SUCCESS;
8✔
1141
  SRequestObj* pRequest = NULL;
8✔
1142

1143
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
8!
1144
  uDebug(LOG_ID_TAG " drop stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
8!
1145
  pRequest->syncQuery = true;
8✔
1146
  if (!pRequest->pDb) {
8!
1147
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
1148
    goto end;
×
1149
  }
1150
  // decode and process req
1151
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
8✔
1152
  uint32_t len = metaLen - sizeof(SMsgHead);
8✔
1153
  tDecoderInit(&coder, data, len);
8✔
1154
  if (tDecodeSVDropStbReq(&coder, &req) < 0) {
8!
1155
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
1156
    goto end;
×
1157
  }
1158

1159
  SCatalog* pCatalog = NULL;
8✔
1160
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
8!
1161
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
8✔
1162
                           .requestId = pRequest->requestId,
8✔
1163
                           .requestObjRefId = pRequest->self,
8✔
1164
                           .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
8✔
1165
  SName            pName = {0};
8✔
1166
  toName(pRequest->pTscObj->acctId, pRequest->pDb, req.name, &pName);
8✔
1167
  STableMeta* pTableMeta = NULL;
8✔
1168
  code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
8✔
1169
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
8✔
1170
    code = TSDB_CODE_SUCCESS;
2✔
1171
    taosMemoryFreeClear(pTableMeta);
2!
1172
    goto end;
2✔
1173
  }
1174
  if (code != TSDB_CODE_SUCCESS) {
6!
UNCOV
1175
    goto end;
×
1176
  }
1177
  pReq.suid = pTableMeta->uid;
6✔
1178
  taosMemoryFreeClear(pTableMeta);
6!
1179

1180
  // build drop stable
1181
  pReq.igNotExists = true;
6✔
1182
  pReq.source = TD_REQ_FROM_TAOX;
6✔
1183
  //  pReq.suid = processSuid(req.suid, pRequest->pDb);
1184

1185
  uDebug(LOG_ID_TAG " drop stable name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
6!
1186
         pReq.suid);
1187
  STscObj* pTscObj = pRequest->pTscObj;
6✔
1188
  SName    tableName = {0};
6✔
1189
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
6✔
1190
  if (tNameExtractFullName(&tableName, pReq.name) != 0) {
6!
1191
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
1192
    goto end;
×
1193
  }
1194

1195
  SCmdMsgInfo pCmdMsg = {0};
6✔
1196
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
6✔
1197
  pCmdMsg.msgType = TDMT_MND_DROP_STB;
6✔
1198
  pCmdMsg.msgLen = tSerializeSMDropStbReq(NULL, 0, &pReq);
6✔
1199
  if (pCmdMsg.msgLen <= 0) {
6!
1200
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
1201
    goto end;
×
1202
  }
1203
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
6!
1204
  RAW_NULL_CHECK(pCmdMsg.pMsg);
6!
1205
  if (tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) <= 0) {
6!
1206
    code = TSDB_CODE_INVALID_PARA;
×
1207
    taosMemoryFree(pCmdMsg.pMsg);
×
UNCOV
1208
    goto end;
×
1209
  }
1210

1211
  SQuery pQuery = {0};
6✔
1212
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
6✔
1213
  pQuery.pCmdMsg = &pCmdMsg;
6✔
1214
  pQuery.msgType = pQuery.pCmdMsg->msgType;
6✔
1215
  pQuery.stableQuery = true;
6✔
1216

1217
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
6✔
1218
  taosMemoryFree(pCmdMsg.pMsg);
6!
1219
  if (pRequest->code == TSDB_CODE_SUCCESS) {
6!
1220
    // ignore the error code
1221
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
6!
1222
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
6!
1223
  }
1224

1225
  code = pRequest->code;
6✔
1226

1227
end:
8✔
1228
  uDebug(LOG_ID_TAG " drop stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
8!
1229
  destroyRequest(pRequest);
8✔
1230
  tDecoderClear(&coder);
8✔
1231
  return code;
8✔
1232
}
1233

1234
typedef struct SVgroupCreateTableBatch {
1235
  SVCreateTbBatchReq req;
1236
  SVgroupInfo        info;
1237
  char               dbName[TSDB_DB_NAME_LEN];
1238
} SVgroupCreateTableBatch;
1239

1240
static void destroyCreateTbReqBatch(void* data) {
139✔
1241
  if (data == NULL) {
139!
1242
    uError("invalid parameter in %s", __func__);
×
UNCOV
1243
    return;
×
1244
  }
1245
  SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data;
139✔
1246
  taosArrayDestroy(pTbBatch->req.pArray);
139✔
1247
}
1248

1249
static int32_t taosCreateTable(TAOS* taos, void* meta, uint32_t metaLen) {
136✔
1250
  if (taos == NULL || meta == NULL) {
136!
1251
    uError("invalid parameter in %s", __func__);
×
UNCOV
1252
    return TSDB_CODE_INVALID_PARA;
×
1253
  }
1254
  SVCreateTbBatchReq req = {0};
136✔
1255
  SDecoder           coder = {0};
136✔
1256
  int32_t            code = TSDB_CODE_SUCCESS;
136✔
1257
  SRequestObj*       pRequest = NULL;
136✔
1258
  SQuery*            pQuery = NULL;
136✔
1259
  SHashObj*          pVgroupHashmap = NULL;
136✔
1260
  SArray*            pTagList = taosArrayInit(0, POINTER_BYTES);
136✔
1261
  RAW_NULL_CHECK(pTagList);
136!
1262
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
136!
1263
  uDebug(LOG_ID_TAG " create table, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
136!
1264

1265
  pRequest->syncQuery = true;
136✔
1266
  if (!pRequest->pDb) {
136!
1267
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
1268
    goto end;
×
1269
  }
1270
  // decode and process req
1271
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
136✔
1272
  uint32_t len = metaLen - sizeof(SMsgHead);
136✔
1273
  tDecoderInit(&coder, data, len);
136✔
1274
  if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
136!
1275
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
1276
    goto end;
×
1277
  }
1278

1279
  STscObj* pTscObj = pRequest->pTscObj;
136✔
1280

1281
  SVCreateTbReq* pCreateReq = NULL;
136✔
1282
  SCatalog*      pCatalog = NULL;
136✔
1283
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
136!
1284
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
136✔
1285
  RAW_NULL_CHECK(pVgroupHashmap);
136!
1286
  taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
136✔
1287

1288
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
136✔
1289
                           .requestId = pRequest->requestId,
136✔
1290
                           .requestObjRefId = pRequest->self,
136✔
1291
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
136✔
1292

1293
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
136✔
1294
  RAW_NULL_CHECK(pRequest->tableList);
136!
1295
  // loop to create table
1296
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
287✔
1297
    pCreateReq = req.pReqs + iReq;
151✔
1298

1299
    SVgroupInfo pInfo = {0};
151✔
1300
    SName       pName = {0};
151✔
1301
    toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
151✔
1302
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
151✔
1303
    if (code != TSDB_CODE_SUCCESS) {
151!
UNCOV
1304
      goto end;
×
1305
    }
1306

1307
    pCreateReq->flags |= TD_CREATE_IF_NOT_EXISTS;
151✔
1308
    // change tag cid to new cid
1309
    if (pCreateReq->type == TSDB_CHILD_TABLE) {
151✔
1310
      STableMeta* pTableMeta = NULL;
131✔
1311
      SName       sName = {0};
131✔
1312
      tb_uid_t    oldSuid = pCreateReq->ctb.suid;
131✔
1313
      //      pCreateReq->ctb.suid = processSuid(pCreateReq->ctb.suid, pRequest->pDb);
1314
      toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName);
131✔
1315
      code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta);
131✔
1316
      if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
131!
1317
        code = TSDB_CODE_SUCCESS;
×
1318
        taosMemoryFreeClear(pTableMeta);
×
UNCOV
1319
        continue;
×
1320
      }
1321

1322
      if (code != TSDB_CODE_SUCCESS) {
131!
UNCOV
1323
        goto end;
×
1324
      }
1325
      pCreateReq->ctb.suid = pTableMeta->uid;
131✔
1326

1327
      SArray* pTagVals = NULL;
131✔
1328
      code = tTagToValArray((STag*)pCreateReq->ctb.pTag, &pTagVals);
131✔
1329
      if (code != TSDB_CODE_SUCCESS) {
131!
1330
        taosMemoryFreeClear(pTableMeta);
×
UNCOV
1331
        goto end;
×
1332
      }
1333

1334
      bool rebuildTag = false;
131✔
1335
      for (int32_t i = 0; i < taosArrayGetSize(pCreateReq->ctb.tagName); i++) {
409✔
1336
        char* tName = taosArrayGet(pCreateReq->ctb.tagName, i);
278✔
1337
        if (tName == NULL) {
278!
UNCOV
1338
          continue;
×
1339
        }
1340
        for (int32_t j = pTableMeta->tableInfo.numOfColumns;
278✔
1341
             j < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; j++) {
1,098✔
1342
          SSchema* tag = &pTableMeta->schema[j];
820✔
1343
          if (strcmp(tag->name, tName) == 0 && tag->type != TSDB_DATA_TYPE_JSON) {
820✔
1344
            STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
262✔
1345
            if (pTagVal) {
262!
1346
              if (pTagVal->cid != tag->colId) {
262✔
1347
                pTagVal->cid = tag->colId;
21✔
1348
                rebuildTag = true;
21✔
1349
              }
1350
            } else {
UNCOV
1351
              uError("create tb invalid data %s, size:%d index:%d cid:%d", pCreateReq->name,
×
1352
                     (int)taosArrayGetSize(pTagVals), i, tag->colId);
1353
            }
1354
          }
1355
        }
1356
      }
1357
      taosMemoryFreeClear(pTableMeta);
131!
1358
      if (rebuildTag) {
131✔
1359
        STag* ppTag = NULL;
13✔
1360
        code = tTagNew(pTagVals, 1, false, &ppTag);
13✔
1361
        taosArrayDestroy(pTagVals);
13✔
1362
        pTagVals = NULL;
13✔
1363
        if (code != TSDB_CODE_SUCCESS) {
13!
UNCOV
1364
          goto end;
×
1365
        }
1366
        if (NULL == taosArrayPush(pTagList, &ppTag)) {
13!
1367
          tTagFree(ppTag);
×
UNCOV
1368
          goto end;
×
1369
        }
1370
        pCreateReq->ctb.pTag = (uint8_t*)ppTag;
13✔
1371
      }
1372
      taosArrayDestroy(pTagVals);
131✔
1373
    }
1374
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
302!
1375

1376
    SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
151✔
1377
    if (pTableBatch == NULL) {
151✔
1378
      SVgroupCreateTableBatch tBatch = {0};
139✔
1379
      tBatch.info = pInfo;
139✔
1380
      tstrncpy(tBatch.dbName, pRequest->pDb, TSDB_DB_NAME_LEN);
139✔
1381

1382
      tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
139✔
1383
      RAW_NULL_CHECK(tBatch.req.pArray);
139!
1384
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pCreateReq));
278!
1385
      tBatch.req.source = TD_REQ_FROM_TAOX;
139✔
1386
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
139!
1387
    } else {  // add to the correct vgroup
1388
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pCreateReq));
24!
1389
    }
1390
  }
1391

1392
  if (taosHashGetSize(pVgroupHashmap) == 0) {
136!
UNCOV
1393
    goto end;
×
1394
  }
1395
  SArray* pBufArray = NULL;
136✔
1396
  RAW_RETURN_CHECK(serializeVgroupsCreateTableBatch(pVgroupHashmap, &pBufArray));
136!
1397
  pQuery = NULL;
136✔
1398
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
136✔
1399
  if (TSDB_CODE_SUCCESS != code) goto end;
136!
1400
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
136✔
1401
  pQuery->msgType = TDMT_VND_CREATE_TABLE;
136✔
1402
  pQuery->stableQuery = false;
136✔
1403
  code = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT, &pQuery->pRoot);
136✔
1404
  if (TSDB_CODE_SUCCESS != code) goto end;
136!
1405
  RAW_NULL_CHECK(pQuery->pRoot);
136!
1406

1407
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
136!
1408

1409
  launchQueryImpl(pRequest, pQuery, true, NULL);
136✔
1410
  if (pRequest->code == TSDB_CODE_SUCCESS) {
136!
1411
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
136!
1412
  }
1413

1414
  code = pRequest->code;
136✔
1415

1416
end:
136✔
1417
  uDebug(LOG_ID_TAG " create table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
136!
1418
  tDeleteSVCreateTbBatchReq(&req);
136✔
1419

1420
  taosHashCleanup(pVgroupHashmap);
136✔
1421
  destroyRequest(pRequest);
136✔
1422
  tDecoderClear(&coder);
136✔
1423
  qDestroyQuery(pQuery);
136✔
1424
  taosArrayDestroyP(pTagList, NULL);
136✔
1425
  return code;
136✔
1426
}
1427

1428
typedef struct SVgroupDropTableBatch {
1429
  SVDropTbBatchReq req;
1430
  SVgroupInfo      info;
1431
  char             dbName[TSDB_DB_NAME_LEN];
1432
} SVgroupDropTableBatch;
1433

1434
static void destroyDropTbReqBatch(void* data) {
3✔
1435
  if (data == NULL) {
3!
1436
    uError("invalid parameter in %s", __func__);
×
UNCOV
1437
    return;
×
1438
  }
1439
  SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
3✔
1440
  taosArrayDestroy(pTbBatch->req.pArray);
3✔
1441
}
1442

1443
static int32_t taosDropTable(TAOS* taos, void* meta, uint32_t metaLen) {
5✔
1444
  if (taos == NULL || meta == NULL) {
5!
1445
    uError("invalid parameter in %s", __func__);
×
UNCOV
1446
    return TSDB_CODE_INVALID_PARA;
×
1447
  }
1448
  SVDropTbBatchReq req = {0};
5✔
1449
  SDecoder         coder = {0};
5✔
1450
  int32_t          code = TSDB_CODE_SUCCESS;
5✔
1451
  SRequestObj*     pRequest = NULL;
5✔
1452
  SQuery*          pQuery = NULL;
5✔
1453
  SHashObj*        pVgroupHashmap = NULL;
5✔
1454

1455
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
5!
1456
  uDebug(LOG_ID_TAG " drop table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
5!
1457

1458
  pRequest->syncQuery = true;
5✔
1459
  if (!pRequest->pDb) {
5!
1460
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
1461
    goto end;
×
1462
  }
1463
  // decode and process req
1464
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
5✔
1465
  uint32_t len = metaLen - sizeof(SMsgHead);
5✔
1466
  tDecoderInit(&coder, data, len);
5✔
1467
  if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
5!
1468
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
1469
    goto end;
×
1470
  }
1471

1472
  STscObj* pTscObj = pRequest->pTscObj;
5✔
1473

1474
  SVDropTbReq* pDropReq = NULL;
5✔
1475
  SCatalog*    pCatalog = NULL;
5✔
1476
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
5!
1477

1478
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
5✔
1479
  RAW_NULL_CHECK(pVgroupHashmap);
5!
1480
  taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
5✔
1481

1482
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
5✔
1483
                           .requestId = pRequest->requestId,
5✔
1484
                           .requestObjRefId = pRequest->self,
5✔
1485
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
5✔
1486
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
5✔
1487
  RAW_NULL_CHECK(pRequest->tableList);
5!
1488
  // loop to create table
1489
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
11✔
1490
    pDropReq = req.pReqs + iReq;
6✔
1491
    pDropReq->igNotExists = true;
6✔
1492
    //    pDropReq->suid = processSuid(pDropReq->suid, pRequest->pDb);
1493

1494
    SVgroupInfo pInfo = {0};
6✔
1495
    SName       pName = {0};
6✔
1496
    toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
6✔
1497
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
6!
1498

1499
    STableMeta* pTableMeta = NULL;
6✔
1500
    code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
6✔
1501
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
6✔
1502
      code = TSDB_CODE_SUCCESS;
2✔
1503
      taosMemoryFreeClear(pTableMeta);
2!
1504
      continue;
2✔
1505
    }
1506
    if (code != TSDB_CODE_SUCCESS) {
4!
UNCOV
1507
      goto end;
×
1508
    }
1509
    tb_uid_t oldSuid = pDropReq->suid;
4✔
1510
    pDropReq->suid = pTableMeta->suid;
4✔
1511
    taosMemoryFreeClear(pTableMeta);
4!
1512
    uDebug(LOG_ID_TAG " drop table name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, pDropReq->name, oldSuid,
4!
1513
           pDropReq->suid);
1514

1515
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
8!
1516
    SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
4✔
1517
    if (pTableBatch == NULL) {
4✔
1518
      SVgroupDropTableBatch tBatch = {0};
3✔
1519
      tBatch.info = pInfo;
3✔
1520
      tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
3✔
1521
      RAW_NULL_CHECK(tBatch.req.pArray);
3!
1522
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pDropReq));
6!
1523
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
3!
1524
    } else {  // add to the correct vgroup
1525
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pDropReq));
2!
1526
    }
1527
  }
1528

1529
  if (taosHashGetSize(pVgroupHashmap) == 0) {
5✔
1530
    goto end;
2✔
1531
  }
1532
  SArray* pBufArray = NULL;
3✔
1533
  RAW_RETURN_CHECK(serializeVgroupsDropTableBatch(pVgroupHashmap, &pBufArray));
3!
1534
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
3✔
1535
  if (TSDB_CODE_SUCCESS != code) goto end;
3!
1536
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
3✔
1537
  pQuery->msgType = TDMT_VND_DROP_TABLE;
3✔
1538
  pQuery->stableQuery = false;
3✔
1539
  pQuery->pRoot = NULL;
3✔
1540
  code = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT, &pQuery->pRoot);
3✔
1541
  if (TSDB_CODE_SUCCESS != code) goto end;
3!
1542
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
3!
1543

1544
  launchQueryImpl(pRequest, pQuery, true, NULL);
3✔
1545
  if (pRequest->code == TSDB_CODE_SUCCESS) {
3!
1546
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
3!
1547
  }
1548
  code = pRequest->code;
3✔
1549

1550
end:
5✔
1551
  uDebug(LOG_ID_TAG " drop table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
5!
1552
  taosHashCleanup(pVgroupHashmap);
5✔
1553
  destroyRequest(pRequest);
5✔
1554
  tDecoderClear(&coder);
5✔
1555
  qDestroyQuery(pQuery);
5✔
1556
  return code;
5✔
1557
}
1558

1559
static int32_t taosDeleteData(TAOS* taos, void* meta, uint32_t metaLen) {
4✔
1560
  if (taos == NULL || meta == NULL) {
4!
1561
    uError("invalid parameter in %s", __func__);
×
UNCOV
1562
    return TSDB_CODE_INVALID_PARA;
×
1563
  }
1564
  SDeleteRes req = {0};
4✔
1565
  SDecoder   coder = {0};
4✔
1566
  char       sql[256] = {0};
4✔
1567
  int32_t    code = TSDB_CODE_SUCCESS;
4✔
1568

1569
  uDebug("connId:0x%" PRIx64 " delete data, meta:%p, len:%d", *(int64_t*)taos, meta, metaLen);
4!
1570

1571
  // decode and process req
1572
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
4✔
1573
  uint32_t len = metaLen - sizeof(SMsgHead);
4✔
1574
  tDecoderInit(&coder, data, len);
4✔
1575
  if (tDecodeDeleteRes(&coder, &req) < 0) {
4!
1576
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
1577
    goto end;
×
1578
  }
1579

1580
  (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
4✔
1581
                 req.tsColName, req.skey, req.tsColName, req.ekey);
1582

1583
  TAOS_RES* res = taosQueryImpl(taos, sql, false, TD_REQ_FROM_TAOX);
4✔
1584
  RAW_NULL_CHECK(res);
4!
1585
  SRequestObj* pRequest = (SRequestObj*)res;
4✔
1586
  code = pRequest->code;
4✔
1587
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_PAR_GET_META_ERROR) {
4!
1588
    code = TSDB_CODE_SUCCESS;
1✔
1589
  }
1590
  taos_free_result(res);
4✔
1591

1592
end:
4✔
1593
  uDebug("connId:0x%" PRIx64 " delete data sql:%s, code:%s", *(int64_t*)taos, sql, tstrerror(code));
4!
1594
  tDecoderClear(&coder);
4✔
1595
  return code;
4✔
1596
}
1597

1598
static int32_t taosAlterTable(TAOS* taos, void* meta, uint32_t metaLen) {
30✔
1599
  if (taos == NULL || meta == NULL) {
30!
1600
    uError("invalid parameter in %s", __func__);
×
UNCOV
1601
    return TSDB_CODE_INVALID_PARA;
×
1602
  }
1603
  SVAlterTbReq   req = {0};
30✔
1604
  SDecoder       dcoder = {0};
30✔
1605
  int32_t        code = TSDB_CODE_SUCCESS;
30✔
1606
  SRequestObj*   pRequest = NULL;
30✔
1607
  SQuery*        pQuery = NULL;
30✔
1608
  SArray*        pArray = NULL;
30✔
1609
  SVgDataBlocks* pVgData = NULL;
30✔
1610

1611
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
30!
1612
  uDebug(LOG_ID_TAG " alter table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
30!
1613
  pRequest->syncQuery = true;
30✔
1614
  if (!pRequest->pDb) {
30!
1615
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
1616
    goto end;
×
1617
  }
1618
  // decode and process req
1619
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
30✔
1620
  uint32_t len = metaLen - sizeof(SMsgHead);
30✔
1621
  tDecoderInit(&dcoder, data, len);
30✔
1622
  if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) {
30!
1623
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
1624
    goto end;
×
1625
  }
1626

1627
  // do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
1628
  if (req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS) {
30✔
1629
    goto end;
5✔
1630
  }
1631

1632
  STscObj*  pTscObj = pRequest->pTscObj;
25✔
1633
  SCatalog* pCatalog = NULL;
25✔
1634
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
25!
1635
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
25✔
1636
                           .requestId = pRequest->requestId,
25✔
1637
                           .requestObjRefId = pRequest->self,
25✔
1638
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
25✔
1639

1640
  SVgroupInfo pInfo = {0};
25✔
1641
  SName       pName = {0};
25✔
1642
  toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
25✔
1643
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
25!
1644
  pArray = taosArrayInit(1, sizeof(void*));
25✔
1645
  RAW_NULL_CHECK(pArray);
25!
1646

1647
  pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
25!
1648
  RAW_NULL_CHECK(pVgData);
25!
1649
  pVgData->vg = pInfo;
25✔
1650

1651
  int tlen = 0;
25✔
1652
  req.source = TD_REQ_FROM_TAOX;
25✔
1653
  tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code);
25!
1654
  if (code != 0) {
25!
1655
    code = terrno;
×
UNCOV
1656
    goto end;
×
1657
  }
1658
  tlen += sizeof(SMsgHead);
25✔
1659
  void* pMsg = taosMemoryMalloc(tlen);
25!
1660
  RAW_NULL_CHECK(pMsg);
25!
1661
  ((SMsgHead*)pMsg)->vgId = htonl(pInfo.vgId);
25✔
1662
  ((SMsgHead*)pMsg)->contLen = htonl(tlen);
25✔
1663
  void*    pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
25✔
1664
  SEncoder coder = {0};
25✔
1665
  tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
25✔
1666
  code = tEncodeSVAlterTbReq(&coder, &req);
25✔
1667
  if (code != 0) {
25!
1668
    tEncoderClear(&coder);
×
1669
    code = terrno;
×
UNCOV
1670
    goto end;
×
1671
  }
1672
  tEncoderClear(&coder);
25✔
1673

1674
  pVgData->pData = pMsg;
25✔
1675
  pVgData->size = tlen;
25✔
1676

1677
  pVgData->numOfTables = 1;
25✔
1678
  RAW_NULL_CHECK(taosArrayPush(pArray, &pVgData));
25!
1679

1680
  pQuery = NULL;
25✔
1681
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
25✔
1682
  if (NULL == pQuery) goto end;
25!
1683
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
25✔
1684
  pQuery->msgType = TDMT_VND_ALTER_TABLE;
25✔
1685
  pQuery->stableQuery = false;
25✔
1686
  pQuery->pRoot = NULL;
25✔
1687
  code = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT, &pQuery->pRoot);
25✔
1688
  if (TSDB_CODE_SUCCESS != code) goto end;
25!
1689
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pArray));
25!
1690

1691
  launchQueryImpl(pRequest, pQuery, true, NULL);
25✔
1692

1693
  pVgData = NULL;
25✔
1694
  pArray = NULL;
25✔
1695
  code = pRequest->code;
25✔
1696
  if (code == TSDB_CODE_TDB_TABLE_NOT_EXIST) {
25✔
1697
    code = TSDB_CODE_SUCCESS;
1✔
1698
  }
1699

1700
  if (pRequest->code == TSDB_CODE_SUCCESS) {
25✔
1701
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
24✔
1702
    if (pRes->res != NULL) {
24✔
1703
      code = handleAlterTbExecRes(pRes->res, pCatalog);
20✔
1704
    }
1705
  }
1706
end:
5✔
1707
  uDebug(LOG_ID_TAG " alter table return, meta:%p, len:%d, msg:%s", LOG_ID_VALUE, meta, metaLen, tstrerror(code));
30!
1708
  taosArrayDestroy(pArray);
30✔
1709
  if (pVgData) taosMemoryFreeClear(pVgData->pData);
30!
1710
  taosMemoryFreeClear(pVgData);
30!
1711
  destroyRequest(pRequest);
30✔
1712
  tDecoderClear(&dcoder);
30✔
1713
  qDestroyQuery(pQuery);
30✔
1714
  return code;
30✔
1715
}
1716

1717
int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD* fields,
1✔
1718
                                     int numFields) {
1719
  return taos_write_raw_block_with_fields_with_reqid(taos, rows, pData, tbname, fields, numFields, 0);
1✔
1720
}
1721

1722
int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname,
1✔
1723
                                                TAOS_FIELD* fields, int numFields, int64_t reqid) {
1724
  if (taos == NULL || pData == NULL || tbname == NULL) {
1!
1725
    uError("invalid parameter in %s", __func__);
×
UNCOV
1726
    return TSDB_CODE_INVALID_PARA;
×
1727
  }
1728
  int32_t     code = TSDB_CODE_SUCCESS;
1✔
1729
  STableMeta* pTableMeta = NULL;
1✔
1730
  SQuery*     pQuery = NULL;
1✔
1731
  SHashObj*   pVgHash = NULL;
1✔
1732

1733
  SRequestObj* pRequest = NULL;
1✔
1734
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
1!
1735

1736
  uDebug(LOG_ID_TAG " write raw block with field, rows:%d, pData:%p, tbname:%s, fields:%p, numFields:%d", LOG_ID_VALUE,
1!
1737
         rows, pData, tbname, fields, numFields);
1738

1739
  pRequest->syncQuery = true;
1✔
1740
  if (!pRequest->pDb) {
1!
1741
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
1742
    goto end;
×
1743
  }
1744

1745
  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
1✔
1746
  tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
1✔
1747
  tstrncpy(pName.tname, tbname, sizeof(pName.tname));
1✔
1748

1749
  struct SCatalog* pCatalog = NULL;
1✔
1750
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
1!
1751

1752
  SRequestConnInfo conn = {0};
1✔
1753
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
1✔
1754
  conn.requestId = pRequest->requestId;
1✔
1755
  conn.requestObjRefId = pRequest->self;
1✔
1756
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
1✔
1757

1758
  SVgroupInfo vgData = {0};
1✔
1759
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData));
1!
1760
  RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
1!
1761
  RAW_RETURN_CHECK(smlInitHandle(&pQuery));
1!
1762
  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
1✔
1763
  RAW_NULL_CHECK(pVgHash);
1!
1764
  RAW_RETURN_CHECK(
1!
1765
      taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
1766
  RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0, false));
1!
1767
  RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
1!
1768

1769
  launchQueryImpl(pRequest, pQuery, true, NULL);
1✔
1770
  code = pRequest->code;
1✔
1771

1772
end:
1✔
1773
  uDebug(LOG_ID_TAG " write raw block with field return, msg:%s", LOG_ID_VALUE, tstrerror(code));
1!
1774
  taosMemoryFreeClear(pTableMeta);
1!
1775
  qDestroyQuery(pQuery);
1✔
1776
  destroyRequest(pRequest);
1✔
1777
  taosHashCleanup(pVgHash);
1✔
1778
  return code;
1✔
1779
}
1780

1781
int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) {
7✔
1782
  return taos_write_raw_block_with_reqid(taos, rows, pData, tbname, 0);
7✔
1783
}
1784

1785
int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname, int64_t reqid) {
7✔
1786
  if (taos == NULL || pData == NULL || tbname == NULL) {
7!
UNCOV
1787
    return TSDB_CODE_INVALID_PARA;
×
1788
  }
1789
  int32_t     code = TSDB_CODE_SUCCESS;
7✔
1790
  STableMeta* pTableMeta = NULL;
7✔
1791
  SQuery*     pQuery = NULL;
7✔
1792
  SHashObj*   pVgHash = NULL;
7✔
1793

1794
  SRequestObj* pRequest = NULL;
7✔
1795
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
7!
1796

1797
  uDebug(LOG_ID_TAG " write raw block, rows:%d, pData:%p, tbname:%s", LOG_ID_VALUE, rows, pData, tbname);
7!
1798

1799
  pRequest->syncQuery = true;
7✔
1800
  if (!pRequest->pDb) {
7!
1801
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
1802
    goto end;
×
1803
  }
1804

1805
  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
7✔
1806
  tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
7✔
1807
  tstrncpy(pName.tname, tbname, sizeof(pName.tname));
7✔
1808

1809
  struct SCatalog* pCatalog = NULL;
7✔
1810
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
7!
1811

1812
  SRequestConnInfo conn = {0};
7✔
1813
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
7✔
1814
  conn.requestId = pRequest->requestId;
7✔
1815
  conn.requestObjRefId = pRequest->self;
7✔
1816
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
7✔
1817

1818
  SVgroupInfo vgData = {0};
7✔
1819
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData));
7!
1820
  RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
7✔
1821
  RAW_RETURN_CHECK(smlInitHandle(&pQuery));
6!
1822
  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
6✔
1823
  RAW_NULL_CHECK(pVgHash);
6!
1824
  RAW_RETURN_CHECK(
6!
1825
      taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
1826
  RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false, NULL, 0, false));
6✔
1827
  RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
4!
1828

1829
  launchQueryImpl(pRequest, pQuery, true, NULL);
4✔
1830
  code = pRequest->code;
4✔
1831

1832
end:
7✔
1833
  uDebug(LOG_ID_TAG " write raw block return, msg:%s", LOG_ID_VALUE, tstrerror(code));
7!
1834
  taosMemoryFreeClear(pTableMeta);
7!
1835
  qDestroyQuery(pQuery);
7✔
1836
  destroyRequest(pRequest);
7✔
1837
  taosHashCleanup(pVgHash);
7✔
1838
  return code;
7✔
1839
}
1840

1841
static void* getRawDataFromRes(void* pRetrieve) {
145✔
1842
  if (pRetrieve == NULL) {
145!
1843
    uError("invalid parameter in %s", __func__);
×
UNCOV
1844
    return NULL;
×
1845
  }
1846
  void* rawData = NULL;
145✔
1847
  // deal with compatibility
1848
  if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_VERSION) {
145!
UNCOV
1849
    rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
×
1850
  } else if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_VERSION ||
145✔
1851
             *(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION) {
17!
1852
    rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
145✔
1853
  }
1854
  return rawData;
145✔
1855
}
1856

1857
static int32_t buildCreateTbMap(SMqDataRsp* rsp, SHashObj* pHashObj) {
10✔
1858
  if (rsp == NULL || pHashObj == NULL) {
10!
1859
    uError("invalid parameter in %s", __func__);
×
UNCOV
1860
    return TSDB_CODE_INVALID_PARA;
×
1861
  }
1862
  // find schema data info
1863
  int32_t       code = 0;
10✔
1864
  SVCreateTbReq pCreateReq = {0};
10✔
1865
  SDecoder      decoderTmp = {0};
10✔
1866

1867
  for (int j = 0; j < rsp->createTableNum; j++) {
29✔
1868
    void** dataTmp = taosArrayGet(rsp->createTableReq, j);
19✔
1869
    RAW_NULL_CHECK(dataTmp);
19!
1870
    int32_t* lenTmp = taosArrayGet(rsp->createTableLen, j);
19✔
1871
    RAW_NULL_CHECK(lenTmp);
19!
1872

1873
    tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
19✔
1874
    RAW_RETURN_CHECK(tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq));
19!
1875

1876
    if (pCreateReq.type != TSDB_CHILD_TABLE) {
19!
1877
      code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1878
      goto end;
×
1879
    }
1880
    if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL) {
19!
1881
      RAW_RETURN_CHECK(
19!
1882
          taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq)));
1883
    } else {
1884
      tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
×
UNCOV
1885
      pCreateReq = (SVCreateTbReq){0};
×
1886
    }
1887

1888
    tDecoderClear(&decoderTmp);
19✔
1889
  }
1890
  return 0;
10✔
1891

1892
end:
×
1893
  tDecoderClear(&decoderTmp);
×
1894
  tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
×
UNCOV
1895
  return code;
×
1896
}
1897

1898
typedef enum {
1899
  WRITE_RAW_INIT_START = 0,
1900
  WRITE_RAW_INIT_OK,
1901
  WRITE_RAW_INIT_FAIL,
1902
} WRITE_RAW_INIT_STATUS;
1903

1904
static SHashObj* writeRawCache = NULL;
1905
static int8_t    initFlag = 0;
1906
static int8_t    initedFlag = WRITE_RAW_INIT_START;
1907

1908
typedef struct {
1909
  SHashObj* pVgHash;
1910
  SHashObj* pNameHash;
1911
  SHashObj* pMetaHash;
1912
} rawCacheInfo;
1913

1914
typedef struct {
1915
  SVgroupInfo vgInfo;
1916
  int64_t     uid;
1917
  int64_t     suid;
1918
} tbInfo;
1919

1920
static void tmqFreeMeta(void* data) {
39✔
1921
  if (data == NULL) {
39!
1922
    uError("invalid parameter in %s", __func__);
×
UNCOV
1923
    return;
×
1924
  }
1925
  STableMeta* pTableMeta = *(STableMeta**)data;
39✔
1926
  taosMemoryFree(pTableMeta);
39!
1927
}
1928

1929
static void freeRawCache(void* data) {
×
1930
  if (data == NULL) {
×
1931
    uError("invalid parameter in %s", __func__);
×
UNCOV
1932
    return;
×
1933
  }
1934
  rawCacheInfo* pRawCache = (rawCacheInfo*)data;
×
1935
  taosHashCleanup(pRawCache->pMetaHash);
×
1936
  taosHashCleanup(pRawCache->pNameHash);
×
UNCOV
1937
  taosHashCleanup(pRawCache->pVgHash);
×
1938
}
1939

1940
static int32_t initRawCacheHash() {
15✔
1941
  if (writeRawCache == NULL) {
15!
1942
    writeRawCache = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
15✔
1943
    if (writeRawCache == NULL) {
15!
UNCOV
1944
      return terrno;
×
1945
    }
1946
    taosHashSetFreeFp(writeRawCache, freeRawCache);
15✔
1947
  }
1948
  return 0;
15✔
1949
}
1950

1951
static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW) {
14✔
1952
  if (rawData == NULL || pSW == NULL){
14!
UNCOV
1953
    return false;
×
1954
  }
1955
  if (pTableMeta == NULL) {
14!
1956
    uError("invalid parameter in %s", __func__);
×
UNCOV
1957
    return false;
×
1958
  }
1959
  char* p = (char*)rawData;
14✔
1960
  // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
1961
  // column length |
1962
  p += sizeof(int32_t);
14✔
1963
  p += sizeof(int32_t);
14✔
1964
  p += sizeof(int32_t);
14✔
1965
  p += sizeof(int32_t);
14✔
1966
  p += sizeof(int32_t);
14✔
1967
  p += sizeof(uint64_t);
14✔
1968
  int8_t* fields = p;
14✔
1969

1970
  if (pSW->nCols != pTableMeta->tableInfo.numOfColumns) {
14✔
1971
    return true;
5✔
1972
  }
1973

1974
  for (int i = 0; i < pSW->nCols; i++) {
47✔
1975
    int j = 0;
38✔
1976
    for (; j < pTableMeta->tableInfo.numOfColumns; j++) {
100!
1977
      SSchema* pColSchema = &pTableMeta->schema[j];
100✔
1978
      SSchemaExt* pColExtSchema = &pTableMeta->schemaExt[j];
100✔
1979
      char*    fieldName = pSW->pSchema[i].name;
100✔
1980

1981
      if (strcmp(pColSchema->name, fieldName) == 0) {
100✔
1982
        if (checkSchema(pColSchema, pColExtSchema, fields, NULL, 0) != 0){
38!
UNCOV
1983
          return true;
×
1984
        }
1985
        break;
38✔
1986
      }
1987
    }
1988
    fields += sizeof(int8_t) + sizeof(int32_t);
38✔
1989

1990
    if (j == pTableMeta->tableInfo.numOfColumns) return true;
38!
1991
  }
1992
  return false;
9✔
1993
}
1994

1995
static int32_t getRawCache(SHashObj** pVgHash, SHashObj** pNameHash, SHashObj** pMetaHash, void* key) {
62✔
1996
  if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || key == NULL) {
62!
UNCOV
1997
    uError("invalid parameter in %s", __func__);
×
UNCOV
1998
    return TSDB_CODE_INVALID_PARA;
×
1999
  }
2000
  int32_t code = 0;
62✔
2001
  void*   cacheInfo = taosHashGet(writeRawCache, &key, POINTER_BYTES);
62✔
2002
  if (cacheInfo == NULL) {
62✔
2003
    *pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
60✔
2004
    RAW_NULL_CHECK(*pVgHash);
60!
2005
    *pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
60✔
2006
    RAW_NULL_CHECK(*pNameHash);
60!
2007
    *pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
60✔
2008
    RAW_NULL_CHECK(*pMetaHash);
60!
2009
    taosHashSetFreeFp(*pMetaHash, tmqFreeMeta);
60✔
2010
    rawCacheInfo info = {*pVgHash, *pNameHash, *pMetaHash};
60✔
2011
    RAW_RETURN_CHECK(taosHashPut(writeRawCache, &key, POINTER_BYTES, &info, sizeof(rawCacheInfo)));
60!
2012
  } else {
2013
    rawCacheInfo* info = (rawCacheInfo*)cacheInfo;
2✔
2014
    *pVgHash = info->pVgHash;
2✔
2015
    *pNameHash = info->pNameHash;
2✔
2016
    *pMetaHash = info->pMetaHash;
2✔
2017
  }
2018

2019
  return 0;
62✔
2020
end:
×
2021
  taosHashCleanup(*pMetaHash);
×
2022
  taosHashCleanup(*pNameHash);
×
UNCOV
2023
  taosHashCleanup(*pVgHash);
×
UNCOV
2024
  return code;
×
2025
}
2026

2027
static int32_t buildRawRequest(TAOS* taos, SRequestObj** pRequest, SCatalog** pCatalog, SRequestConnInfo* conn) {
62✔
2028
  if (taos == NULL || pRequest == NULL || pCatalog == NULL || conn == NULL) {
62!
UNCOV
2029
    uError("invalid parameter in %s", __func__);
×
UNCOV
2030
    return TSDB_CODE_INVALID_PARA;
×
2031
  }
2032
  int32_t code = 0;
62✔
2033
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, pRequest, 0));
62!
2034
  (*pRequest)->syncQuery = true;
62✔
2035
  if (!(*pRequest)->pDb) {
62!
UNCOV
2036
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
2037
    goto end;
×
2038
  }
2039

2040
  RAW_RETURN_CHECK(catalogGetHandle((*pRequest)->pTscObj->pAppInfo->clusterId, pCatalog));
62!
2041
  conn->pTrans = (*pRequest)->pTscObj->pAppInfo->pTransporter;
62✔
2042
  conn->requestId = (*pRequest)->requestId;
62✔
2043
  conn->requestObjRefId = (*pRequest)->self;
62✔
2044
  conn->mgmtEps = getEpSet_s(&(*pRequest)->pTscObj->pAppInfo->mgmtEp);
62✔
2045

2046
end:
62✔
2047
  return code;
62✔
2048
}
2049

2050
typedef int32_t _raw_decode_func_(SDecoder* pDecoder, SMqDataRsp* pRsp);
2051
static int32_t  decodeRawData(SDecoder* decoder, void* data, uint32_t dataLen, _raw_decode_func_ func,
62✔
2052
                              SMqRspObj* rspObj) {
2053
  if (decoder == NULL || data == NULL || func == NULL || rspObj == NULL) {
62!
UNCOV
2054
    uError("invalid parameter in %s", __func__);
×
UNCOV
2055
    return TSDB_CODE_INVALID_PARA;
×
2056
  }
2057
  int8_t dataVersion = *(int8_t*)data;
62✔
2058
  if (dataVersion >= MQ_DATA_RSP_VERSION) {
62✔
2059
    data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
45✔
2060
    if (dataLen < sizeof(int8_t) + sizeof(int32_t)) {
45!
UNCOV
2061
      return TSDB_CODE_INVALID_PARA;
×
2062
    }
2063
    dataLen -= sizeof(int8_t) + sizeof(int32_t);
45✔
2064
  }
2065

2066
  rspObj->resIter = -1;
62✔
2067
  tDecoderInit(decoder, data, dataLen);
62✔
2068
  int32_t code = func(decoder, &rspObj->dataRsp);
62✔
2069
  if (code != 0) {
62!
UNCOV
2070
    SET_ERROR_MSG("decode mq taosx data rsp failed");
×
2071
  }
2072
  return code;
62✔
2073
}
2074

2075
static int32_t processCacheMeta(SHashObj* pVgHash, SHashObj* pNameHash, SHashObj* pMetaHash,
145✔
2076
                                SVCreateTbReq* pCreateReqDst, SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName,
2077
                                STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry) {
2078
  if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || pCatalog == NULL || conn == NULL || pName == NULL ||
145!
2079
      pMeta == NULL) {
UNCOV
2080
    uError("invalid parameter in %s", __func__);
×
UNCOV
2081
    return TSDB_CODE_INVALID_PARA;
×
2082
  }
2083
  int32_t     code = 0;
145✔
2084
  STableMeta* pTableMeta = NULL;
145✔
2085
  tbInfo*     tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
145✔
2086
  if (tmpInfo == NULL || retry > 0) {
145!
2087
    tbInfo info = {0};
131✔
2088

2089
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, conn, pName, &info.vgInfo));
131!
2090
    if (pCreateReqDst && tmpInfo == NULL) {  // change stable name to get meta
131!
2091
      tstrncpy(pName->tname, pCreateReqDst->ctb.stbName, TSDB_TABLE_NAME_LEN);
19✔
2092
    }
2093
    RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
131!
2094
    info.uid = pTableMeta->uid;
131✔
2095
    if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
131✔
2096
      info.suid = pTableMeta->suid;
94✔
2097
    } else {
2098
      info.suid = pTableMeta->uid;
37✔
2099
    }
2100
    code = taosHashPut(pMetaHash, &info.suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
131✔
2101
    if (code != 0) {
131!
UNCOV
2102
      taosMemoryFree(pTableMeta);
×
UNCOV
2103
      goto end;
×
2104
    }
2105
    if (pCreateReqDst) {
131✔
2106
      pTableMeta->vgId = info.vgInfo.vgId;
19✔
2107
      pTableMeta->uid = pCreateReqDst->uid;
19✔
2108
      pCreateReqDst->ctb.suid = pTableMeta->suid;
19✔
2109
    }
2110

2111
    RAW_RETURN_CHECK(taosHashPut(pNameHash, pName->tname, strlen(pName->tname), &info, sizeof(tbInfo)));
131!
2112
    tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
131✔
2113
    RAW_RETURN_CHECK(
131!
2114
        taosHashPut(pVgHash, &info.vgInfo.vgId, sizeof(info.vgInfo.vgId), &info.vgInfo, sizeof(SVgroupInfo)));
2115
  }
2116

2117
  if (pTableMeta == NULL || retry > 0) {
145!
2118
    STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, &tmpInfo->suid, LONG_BYTES);
14✔
2119
    if (pTableMetaTmp == NULL || retry > 0 || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) {
14!
2120
      RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
5!
2121
      code = taosHashPut(pMetaHash, &tmpInfo->suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
5✔
2122
      if (code != 0) {
5!
UNCOV
2123
        taosMemoryFree(pTableMeta);
×
UNCOV
2124
        goto end;
×
2125
      }
2126

2127
    } else {
2128
      pTableMeta = *pTableMetaTmp;
9✔
2129
      pTableMeta->uid = tmpInfo->uid;
9✔
2130
      pTableMeta->vgId = tmpInfo->vgInfo.vgId;
9✔
2131
    }
2132
  }
2133
  *pMeta = pTableMeta;
145✔
2134

2135
end:
145✔
2136
  return code;
145✔
2137
}
2138

2139
static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
35✔
2140
  if (taos == NULL || data == NULL) {
35!
UNCOV
2141
    uError("invalid parameter in %s", __func__);
×
UNCOV
2142
    return TSDB_CODE_INVALID_PARA;
×
2143
  }
2144
  int32_t   code = TSDB_CODE_SUCCESS;
35✔
2145
  SQuery*   pQuery = NULL;
35✔
2146
  SMqRspObj rspObj = {0};
35✔
2147
  SDecoder  decoder = {0};
35✔
2148

2149
  SRequestObj*     pRequest = NULL;
35✔
2150
  SCatalog*        pCatalog = NULL;
35✔
2151
  SRequestConnInfo conn = {0};
35✔
2152
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
35!
2153
  uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
35!
2154
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
35!
2155

2156
  SHashObj* pVgHash = NULL;
35✔
2157
  SHashObj* pNameHash = NULL;
35✔
2158
  SHashObj* pMetaHash = NULL;
35✔
2159
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
35!
2160
  int retry = 0;
35✔
2161
  while (1) {
2162
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
35!
2163
    uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
35!
2164
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
134✔
2165
      if (!rspObj.dataRsp.withSchema) {
99!
UNCOV
2166
        goto end;
×
2167
      }
2168

2169
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
99✔
2170
      RAW_NULL_CHECK(tbName);
99!
2171
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
99✔
2172
      RAW_NULL_CHECK(pSW);
99!
2173
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
99✔
2174
      RAW_NULL_CHECK(pRetrieve);
99!
2175
      void* rawData = getRawDataFromRes(pRetrieve);
99✔
2176
      RAW_NULL_CHECK(rawData);
99!
2177

2178
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
99!
2179
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
99✔
2180
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
99✔
2181
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
99✔
2182

2183
      STableMeta* pTableMeta = NULL;
99✔
2184
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, &pTableMeta, pSW,
99!
2185
                                        rawData, retry));
2186
      char err[ERR_MSG_LEN] = {0};
99✔
2187
      code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
99✔
2188
      if (code != TSDB_CODE_SUCCESS) {
99!
UNCOV
2189
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
UNCOV
2190
        goto end;
×
2191
      }
2192
    }
2193
    RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
35!
2194
    launchQueryImpl(pRequest, pQuery, true, NULL);
35✔
2195
    code = pRequest->code;
35✔
2196

2197
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
35!
2198
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
2199
      qDestroyQuery(pQuery);
×
2200
      pQuery = NULL;
×
UNCOV
2201
      rspObj.resIter = -1;
×
UNCOV
2202
      continue;
×
2203
    }
2204
    break;
35✔
2205
  }
2206

2207
end:
35✔
2208
  uDebug(LOG_ID_TAG " write raw data return, msg:%s", LOG_ID_VALUE, tstrerror(code));
35!
2209
  tDeleteMqDataRsp(&rspObj.dataRsp);
35✔
2210
  tDecoderClear(&decoder);
35✔
2211
  qDestroyQuery(pQuery);
35✔
2212
  destroyRequest(pRequest);
35✔
2213
  return code;
35✔
2214
}
2215

2216
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
10✔
2217
  if (taos == NULL || data == NULL) {
10!
UNCOV
2218
    uError("invalid parameter in %s", __func__);
×
UNCOV
2219
    return TSDB_CODE_INVALID_PARA;
×
2220
  }
2221
  int32_t   code = TSDB_CODE_SUCCESS;
10✔
2222
  SQuery*   pQuery = NULL;
10✔
2223
  SMqRspObj rspObj = {0};
10✔
2224
  SDecoder  decoder = {0};
10✔
2225
  SHashObj* pCreateTbHash = NULL;
10✔
2226

2227
  SRequestObj*     pRequest = NULL;
10✔
2228
  SCatalog*        pCatalog = NULL;
10✔
2229
  SRequestConnInfo conn = {0};
10✔
2230

2231
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
10!
2232
  uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
10!
2233
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeSTaosxRsp, &rspObj));
10!
2234

2235
  pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
10✔
2236
  RAW_NULL_CHECK(pCreateTbHash);
10!
2237
  RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash));
10!
2238

2239
  SHashObj* pVgHash = NULL;
10✔
2240
  SHashObj* pNameHash = NULL;
10✔
2241
  SHashObj* pMetaHash = NULL;
10✔
2242
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
10!
2243
  int retry = 0;
10✔
2244
  while (1) {
2245
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
10!
2246
    uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
10!
2247
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
39✔
2248
      if (!rspObj.dataRsp.withSchema) {
29!
UNCOV
2249
        goto end;
×
2250
      }
2251

2252
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
29✔
2253
      RAW_NULL_CHECK(tbName);
29!
2254
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
29✔
2255
      RAW_NULL_CHECK(pSW);
29!
2256
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
29✔
2257
      RAW_NULL_CHECK(pRetrieve);
29!
2258
      void* rawData = getRawDataFromRes(pRetrieve);
29✔
2259
      RAW_NULL_CHECK(rawData);
29!
2260

2261
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
29!
2262
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
29✔
2263
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
29✔
2264
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
29✔
2265

2266
      // find schema data info
2267
      SVCreateTbReq* pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, pName.tname, strlen(pName.tname));
29✔
2268
      STableMeta*    pTableMeta = NULL;
29✔
2269
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, pCreateReqDst, pCatalog, &conn, &pName,
29!
2270
                                        &pTableMeta, pSW, rawData, retry));
2271
      char err[ERR_MSG_LEN] = {0};
29✔
2272
      code =
2273
          rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
29✔
2274
      if (code != TSDB_CODE_SUCCESS) {
29!
UNCOV
2275
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
UNCOV
2276
        goto end;
×
2277
      }
2278
    }
2279
    RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
10!
2280
    launchQueryImpl(pRequest, pQuery, true, NULL);
10✔
2281
    code = pRequest->code;
10✔
2282

2283
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
10!
2284
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
2285
      qDestroyQuery(pQuery);
×
2286
      pQuery = NULL;
×
UNCOV
2287
      rspObj.resIter = -1;
×
UNCOV
2288
      continue;
×
2289
    }
2290
    break;
10✔
2291
  }
2292

2293
end:
10✔
2294
  uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
10!
2295
  tDeleteSTaosxRsp(&rspObj.dataRsp);
10✔
2296
  void* pIter = taosHashIterate(pCreateTbHash, NULL);
10✔
2297
  while (pIter) {
29✔
2298
    tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE);
19✔
2299
    pIter = taosHashIterate(pCreateTbHash, pIter);
19✔
2300
  }
2301
  taosHashCleanup(pCreateTbHash);
10✔
2302
  tDecoderClear(&decoder);
10✔
2303
  qDestroyQuery(pQuery);
10✔
2304
  destroyRequest(pRequest);
10✔
2305
  return code;
10✔
2306
}
2307

2308
static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
17✔
2309
  if (taos == NULL || data == NULL) {
17!
UNCOV
2310
    uError("invalid parameter in %s", __func__);
×
UNCOV
2311
    return TSDB_CODE_INVALID_PARA;
×
2312
  }
2313
  int32_t   code = TSDB_CODE_SUCCESS;
17✔
2314
  SQuery*   pQuery = NULL;
17✔
2315
  SHashObj* pVgroupHash = NULL;
17✔
2316
  SMqRspObj rspObj = {0};
17✔
2317
  SDecoder  decoder = {0};
17✔
2318

2319
  SRequestObj*     pRequest = NULL;
17✔
2320
  SCatalog*        pCatalog = NULL;
17✔
2321
  SRequestConnInfo conn = {0};
17✔
2322

2323
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
17!
2324
  uDebug(LOG_ID_TAG " write raw rawdata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
17!
2325
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
17!
2326

2327
  SHashObj* pVgHash = NULL;
17✔
2328
  SHashObj* pNameHash = NULL;
17✔
2329
  SHashObj* pMetaHash = NULL;
17✔
2330
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
17!
2331
  int retry = 0;
17✔
UNCOV
2332
  while (1) {
×
2333
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
17!
2334
    uDebug(LOG_ID_TAG " write raw rawdata block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
17!
2335
    SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(pQuery)->pRoot;
17✔
2336
    pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
17✔
2337
    RAW_NULL_CHECK(pVgroupHash);
17!
2338
    pStmt->pVgDataBlocks = taosArrayInit(8, POINTER_BYTES);
17✔
2339
    RAW_NULL_CHECK(pStmt->pVgDataBlocks);
17!
2340

2341
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
34✔
2342
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
17✔
2343
      RAW_NULL_CHECK(tbName);
17!
2344
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
17✔
2345
      RAW_NULL_CHECK(pRetrieve);
17!
2346
      void* rawData = getRawDataFromRes(pRetrieve);
17✔
2347
      RAW_NULL_CHECK(rawData);
17!
2348

2349
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
17!
2350
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
17✔
2351
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
17✔
2352
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
17✔
2353

2354
      // find schema data info
2355
      STableMeta*    pTableMeta = NULL;
17✔
2356
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName,
17!
2357
                                        &pTableMeta, NULL, NULL, retry));
2358
      char err[ERR_MSG_LEN] = {0};
17✔
2359
      code = rawBlockBindRawData(pVgroupHash, pStmt->pVgDataBlocks, pTableMeta, rawData);
17✔
2360
      if (code != TSDB_CODE_SUCCESS) {
17!
UNCOV
2361
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
UNCOV
2362
        goto end;
×
2363
      }
2364
    }
2365
    taosHashCleanup(pVgroupHash);
17✔
2366
    pVgroupHash = NULL;
17✔
2367

2368
    RAW_RETURN_CHECK(smlBuildOutputRaw(pQuery, pVgHash));
17!
2369
    launchQueryImpl(pRequest, pQuery, true, NULL);
17✔
2370
    code = pRequest->code;
17✔
2371

2372
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
17!
2373
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
2374
      qDestroyQuery(pQuery);
×
2375
      pQuery = NULL;
×
UNCOV
2376
      rspObj.resIter = -1;
×
UNCOV
2377
      continue;
×
2378
    }
2379
    break;
17✔
2380
  }
2381

2382
  end:
17✔
2383
  uDebug(LOG_ID_TAG " write raw rawdata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
17!
2384
  tDeleteMqDataRsp(&rspObj.dataRsp);
17✔
2385
  tDecoderClear(&decoder);
17✔
2386
  qDestroyQuery(pQuery);
17✔
2387
  taosHashCleanup(pVgroupHash);
17✔
2388
  destroyRequest(pRequest);
17✔
2389
  return code;
17✔
2390
}
2391

2392
static void processSimpleMeta(SMqMetaRsp* pMetaRsp, cJSON** meta) {
327✔
2393
  if (pMetaRsp == NULL || meta == NULL) {
327!
UNCOV
2394
    uError("invalid parameter in %s", __func__);
×
UNCOV
2395
    return;
×
2396
  }
2397
  if (pMetaRsp->resMsgType == TDMT_VND_CREATE_STB) {
327✔
2398
    processCreateStb(pMetaRsp, meta);
89✔
2399
  } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_STB) {
238✔
2400
    processAlterStb(pMetaRsp, meta);
55✔
2401
  } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_STB) {
183✔
2402
    processDropSTable(pMetaRsp, meta);
8✔
2403
  } else if (pMetaRsp->resMsgType == TDMT_VND_CREATE_TABLE) {
175✔
2404
    processCreateTable(pMetaRsp, meta);
136✔
2405
  } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_TABLE) {
39✔
2406
    processAlterTable(pMetaRsp, meta);
30✔
2407
  } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_TABLE) {
9✔
2408
    processDropTable(pMetaRsp, meta);
5✔
2409
  } else if (pMetaRsp->resMsgType == TDMT_VND_DELETE) {
4!
2410
    processDeleteTable(pMetaRsp, meta);
4✔
2411
  }
2412
}
2413

2414
static void processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp, char** string) {
15✔
2415
  if (pMsgRsp == NULL || string == NULL) {
15!
UNCOV
2416
    uError("invalid parameter in %s", __func__);
×
2417
    return;
15✔
2418
  }
2419
  SDecoder        coder = {0};
15✔
2420
  SMqBatchMetaRsp rsp = {0};
15✔
2421
  int32_t         code = 0;
15✔
2422
  cJSON*          pJson = NULL;
15✔
2423
  tDecoderInit(&coder, pMsgRsp->pMetaBuff, pMsgRsp->metaBuffLen);
15✔
2424
  if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) {
15!
UNCOV
2425
    goto end;
×
2426
  }
2427

2428
  pJson = cJSON_CreateObject();
15✔
2429
  RAW_NULL_CHECK(pJson);
15!
2430
  RAW_FALSE_CHECK(cJSON_AddStringToObject(pJson, "tmq_meta_version", TMQ_META_VERSION));
15!
2431
  cJSON* pMetaArr = cJSON_CreateArray();
15✔
2432
  RAW_NULL_CHECK(pMetaArr);
15!
2433
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(pJson, "metas", pMetaArr));
15!
2434

2435
  int32_t num = taosArrayGetSize(rsp.batchMetaReq);
15✔
2436
  for (int32_t i = 0; i < num; i++) {
143✔
2437
    int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
128✔
2438
    RAW_NULL_CHECK(len);
128!
2439
    void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
128✔
2440
    RAW_NULL_CHECK(tmpBuf);
128!
2441
    SDecoder   metaCoder = {0};
128✔
2442
    SMqMetaRsp metaRsp = {0};
128✔
2443
    tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead));
128✔
2444
    if (tDecodeMqMetaRsp(&metaCoder, &metaRsp) < 0) {
128!
UNCOV
2445
      goto end;
×
2446
    }
2447
    cJSON* pItem = NULL;
128✔
2448
    processSimpleMeta(&metaRsp, &pItem);
128✔
2449
    tDeleteMqMetaRsp(&metaRsp);
128✔
2450
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(pMetaArr, pItem));
128!
2451
  }
2452

2453
  tDeleteMqBatchMetaRsp(&rsp);
15✔
2454
  char* fullStr = cJSON_PrintUnformatted(pJson);
15✔
2455
  cJSON_Delete(pJson);
15✔
2456
  *string = fullStr;
15✔
2457
  return;
15✔
2458

2459
end:
×
UNCOV
2460
  cJSON_Delete(pJson);
×
UNCOV
2461
  tDeleteMqBatchMetaRsp(&rsp);
×
2462
}
2463

2464
char* tmq_get_json_meta(TAOS_RES* res) {
224✔
2465
  if (res == NULL) {
224!
UNCOV
2466
    uError("invalid parameter in %s", __func__);
×
UNCOV
2467
    return NULL;
×
2468
  }
2469
  uDebug("tmq_get_json_meta res:%p", res);
224!
2470
  if (!TD_RES_TMQ_META(res) && !TD_RES_TMQ_METADATA(res) && !TD_RES_TMQ_BATCH_META(res)) {
224!
UNCOV
2471
    return NULL;
×
2472
  }
2473

2474
  char*      string = NULL;
224✔
2475
  SMqRspObj* rspObj = (SMqRspObj*)res;
224✔
2476
  if (TD_RES_TMQ_METADATA(res)) {
224✔
2477
    processAutoCreateTable(&rspObj->dataRsp, &string);
10✔
2478
  } else if (TD_RES_TMQ_BATCH_META(res)) {
214✔
2479
    processBatchMetaToJson(&rspObj->batchMetaRsp, &string);
15✔
2480
  } else if (TD_RES_TMQ_META(res)) {
199!
2481
    cJSON* pJson = NULL;
199✔
2482
    processSimpleMeta(&rspObj->metaRsp, &pJson);
199✔
2483
    string = cJSON_PrintUnformatted(pJson);
199✔
2484
    cJSON_Delete(pJson);
199✔
2485
  } else {
UNCOV
2486
    uError("tmq_get_json_meta res:%d, invalid type", *(int8_t*)res);
×
2487
  }
2488

2489
  uDebug("tmq_get_json_meta string:%s", string);
224!
2490
  return string;
224✔
2491
}
2492

2493
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
267!
2494

2495
static int32_t getOffSetLen(const SMqDataRsp* pRsp) {
45✔
2496
  if (pRsp == NULL) {
45!
UNCOV
2497
    uError("invalid parameter in %s", __func__);
×
UNCOV
2498
    return TSDB_CODE_INVALID_PARA;
×
2499
  }
2500
  SEncoder coder = {0};
45✔
2501
  tEncoderInit(&coder, NULL, 0);
45✔
2502
  if (tEncodeSTqOffsetVal(&coder, &pRsp->reqOffset) < 0) return -1;
45!
2503
  if (tEncodeSTqOffsetVal(&coder, &pRsp->rspOffset) < 0) return -1;
45!
2504
  int32_t pos = coder.pos;
45✔
2505
  tEncoderClear(&coder);
45✔
2506
  return pos;
45✔
2507
}
2508

2509
typedef int32_t __encode_func__(SEncoder* pEncoder, const SMqDataRsp* pRsp);
2510
static int32_t  encodeMqDataRsp(__encode_func__* encodeFunc, SMqDataRsp* rspObj, tmq_raw_data* raw) {
45✔
2511
  if (raw == NULL || encodeFunc == NULL || rspObj == NULL) {
45!
UNCOV
2512
    uError("invalid parameter in %s", __func__);
×
UNCOV
2513
    return TSDB_CODE_INVALID_PARA;
×
2514
  }
2515
  uint32_t len = 0;
45✔
2516
  int32_t  code = 0;
45✔
2517
  SEncoder encoder = {0};
45✔
2518
  void*    buf = NULL;
45✔
2519
  tEncodeSize(encodeFunc, rspObj, len, code);
45!
2520
  if (code < 0) {
45!
UNCOV
2521
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
2522
    goto FAILED;
×
2523
  }
2524
  len += sizeof(int8_t) + sizeof(int32_t);
45✔
2525
  buf = taosMemoryCalloc(1, len);
45!
2526
  if (buf == NULL) {
45!
UNCOV
2527
    code = terrno;
×
UNCOV
2528
    goto FAILED;
×
2529
  }
2530
  tEncoderInit(&encoder, buf, len);
45✔
2531
  if (tEncodeI8(&encoder, MQ_DATA_RSP_VERSION) < 0) {
45!
UNCOV
2532
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
2533
    goto FAILED;
×
2534
  }
2535
  int32_t offsetLen = getOffSetLen(rspObj);
45✔
2536
  if (offsetLen <= 0) {
45!
UNCOV
2537
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
2538
    goto FAILED;
×
2539
  }
2540
  if (tEncodeI32(&encoder, offsetLen) < 0) {
45!
UNCOV
2541
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
2542
    goto FAILED;
×
2543
  }
2544
  if (encodeFunc(&encoder, rspObj) < 0) {
45!
UNCOV
2545
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
2546
    goto FAILED;
×
2547
  }
2548
  tEncoderClear(&encoder);
45✔
2549

2550
  raw->raw = buf;
45✔
2551
  raw->raw_len = len;
45✔
2552
  return code;
45✔
2553
FAILED:
×
2554
  tEncoderClear(&encoder);
×
UNCOV
2555
  taosMemoryFree(buf);
×
UNCOV
2556
  return code;
×
2557
}
2558

2559
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
301✔
2560
  if (raw == NULL || res == NULL) {
301!
UNCOV
2561
    uError("invalid parameter in %s", __func__);
×
UNCOV
2562
    return TSDB_CODE_INVALID_PARA;
×
2563
  }
2564
  *raw = (tmq_raw_data){0};
301✔
2565
  SMqRspObj* rspObj = ((SMqRspObj*)res);
301✔
2566
  if (TD_RES_TMQ_META(res)) {
301✔
2567
    raw->raw = rspObj->metaRsp.metaRsp;
224✔
2568
    raw->raw_len = rspObj->metaRsp.metaRspLen >= 0 ? rspObj->metaRsp.metaRspLen : 0;
224✔
2569
    raw->raw_type = rspObj->metaRsp.resMsgType;
224✔
2570
    uDebug("tmq get raw type meta:%p", raw);
224!
2571
  } else if (TD_RES_TMQ(res)) {
77✔
2572
    int32_t code = encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->dataRsp, raw);
35✔
2573
    if (code != 0) {
35!
UNCOV
2574
      uError("tmq get raw type error:%d", terrno);
×
UNCOV
2575
      return code;
×
2576
    }
2577
    raw->raw_type = RES_TYPE__TMQ;
35✔
2578
    uDebug("tmq get raw type data:%p", raw);
35!
2579
  } else if (TD_RES_TMQ_METADATA(res)) {
42✔
2580
    int32_t code = encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->dataRsp, raw);
10✔
2581
    if (code != 0) {
10!
UNCOV
2582
      uError("tmq get raw type error:%d", terrno);
×
UNCOV
2583
      return code;
×
2584
    }
2585
    raw->raw_type = RES_TYPE__TMQ_METADATA;
10✔
2586
    uDebug("tmq get raw type metadata:%p", raw);
10!
2587
  } else if (TD_RES_TMQ_BATCH_META(res)) {
32✔
2588
    raw->raw = rspObj->batchMetaRsp.pMetaBuff;
15✔
2589
    raw->raw_len = rspObj->batchMetaRsp.metaBuffLen;
15✔
2590
    raw->raw_type = rspObj->resType;
15✔
2591
    uDebug("tmq get raw batch meta:%p", raw);
15!
2592
  } else if (TD_RES_TMQ_RAW(res)) {
17!
2593
    raw->raw = rspObj->dataRsp.rawData;
17✔
2594
    rspObj->dataRsp.rawData = NULL;
17✔
2595
    raw->raw_len = rspObj->dataRsp.len;
17✔
2596
    raw->raw_type = rspObj->resType;
17✔
2597
    uDebug("tmq get raw raw:%p", raw);
17!
2598
  } else {
UNCOV
2599
    uError("tmq get raw error type:%d", *(int8_t*)res);
×
UNCOV
2600
    return TSDB_CODE_TMQ_INVALID_MSG;
×
2601
  }
2602
  return TSDB_CODE_SUCCESS;
301✔
2603
}
2604

2605
void tmq_free_raw(tmq_raw_data raw) {
301✔
2606
  uDebug("tmq free raw data type:%d", raw.raw_type);
301!
2607
  if (raw.raw_type == RES_TYPE__TMQ ||
301✔
2608
      raw.raw_type == RES_TYPE__TMQ_METADATA) {
266✔
2609
    taosMemoryFree(raw.raw);
45!
2610
  } else if(raw.raw_type == RES_TYPE__TMQ_RAWDATA && raw.raw != NULL){
256!
2611
    taosMemoryFree(POINTER_SHIFT(raw.raw, - sizeof(SMqRspHead)));
17!
2612
  }
2613
  (void)memset(terrMsg, 0, ERR_MSG_LEN);
301✔
2614
}
301✔
2615

2616
static int32_t writeRawInit() {
404✔
2617
  while (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_START) {
419✔
2618
    int8_t old = atomic_val_compare_exchange_8(&initFlag, 0, 1);
15✔
2619
    if (old == 0) {
15!
2620
      int32_t code = initRawCacheHash();
15✔
2621
      if (code != 0) {
15!
2622
        uError("tmq writeRawImpl init error:%d", code);
×
UNCOV
2623
        atomic_store_8(&initedFlag, WRITE_RAW_INIT_FAIL);
×
UNCOV
2624
        return code;
×
2625
      }
2626
      atomic_store_8(&initedFlag, WRITE_RAW_INIT_OK);
15✔
2627
    }
2628
  }
2629

2630
  if (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_FAIL) {
404!
UNCOV
2631
    return TSDB_CODE_INTERNAL_ERROR;
×
2632
  }
2633
  return 0;
404✔
2634
}
2635

2636
static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) {
404✔
2637
  if (taos == NULL || buf == NULL) {
404!
UNCOV
2638
    uError("invalid parameter in %s", __func__);
×
UNCOV
2639
    return TSDB_CODE_INVALID_PARA;
×
2640
  }
2641
  if (writeRawInit() != 0) {
404!
UNCOV
2642
    return TSDB_CODE_INTERNAL_ERROR;
×
2643
  }
2644

2645
  if (type == TDMT_VND_CREATE_STB) {
404✔
2646
    return taosCreateStb(taos, buf, len);
89✔
2647
  } else if (type == TDMT_VND_ALTER_STB) {
315✔
2648
    return taosCreateStb(taos, buf, len);
55✔
2649
  } else if (type == TDMT_VND_DROP_STB) {
260✔
2650
    return taosDropStb(taos, buf, len);
8✔
2651
  } else if (type == TDMT_VND_CREATE_TABLE) {
252✔
2652
    return taosCreateTable(taos, buf, len);
136✔
2653
  } else if (type == TDMT_VND_ALTER_TABLE) {
116✔
2654
    return taosAlterTable(taos, buf, len);
30✔
2655
  } else if (type == TDMT_VND_DROP_TABLE) {
86✔
2656
    return taosDropTable(taos, buf, len);
5✔
2657
  } else if (type == TDMT_VND_DELETE) {
81✔
2658
    return taosDeleteData(taos, buf, len);
4✔
2659
  } else if (type == RES_TYPE__TMQ_METADATA) {
77✔
2660
    return tmqWriteRawMetaDataImpl(taos, buf, len);
10✔
2661
  } else if (type == RES_TYPE__TMQ_RAWDATA) {
67✔
2662
    return tmqWriteRawRawDataImpl(taos, buf, len);
17✔
2663
  } else if (type == RES_TYPE__TMQ) {
50✔
2664
    return tmqWriteRawDataImpl(taos, buf, len);
35✔
2665
  } else if (type == RES_TYPE__TMQ_BATCH_META) {
15!
2666
    return tmqWriteBatchMetaDataImpl(taos, buf, len);
15✔
2667
  }
UNCOV
2668
  return TSDB_CODE_INVALID_PARA;
×
2669
}
2670

2671
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
290✔
2672
  if (taos == NULL || raw.raw == NULL || raw.raw_len <= 0) {
290!
2673
    SET_ERROR_MSG("taos:%p or data:%p is NULL or raw_len <= 0", taos, raw.raw);
14✔
2674
    return TSDB_CODE_INVALID_PARA;
14✔
2675
  }
2676
  taosClearErrMsg(); // clear global error message
276✔
2677

2678
  return writeRawImpl(taos, raw.raw, raw.raw_len, raw.raw_type);
276✔
2679
}
2680

2681
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen) {
15✔
2682
  if (taos == NULL || meta == NULL) {
15!
UNCOV
2683
    uError("invalid parameter in %s", __func__);
×
UNCOV
2684
    return TSDB_CODE_INVALID_PARA;
×
2685
  }
2686
  SMqBatchMetaRsp rsp = {0};
15✔
2687
  SDecoder        coder = {0};
15✔
2688
  int32_t         code = TSDB_CODE_SUCCESS;
15✔
2689

2690
  // decode and process req
2691
  tDecoderInit(&coder, meta, metaLen);
15✔
2692
  if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) {
15!
UNCOV
2693
    code = TSDB_CODE_INVALID_PARA;
×
UNCOV
2694
    goto end;
×
2695
  }
2696
  int32_t num = taosArrayGetSize(rsp.batchMetaReq);
15✔
2697
  for (int32_t i = 0; i < num; i++) {
143✔
2698
    int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
128✔
2699
    RAW_NULL_CHECK(len);
128!
2700
    void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
128✔
2701
    RAW_NULL_CHECK(tmpBuf);
128!
2702
    SDecoder   metaCoder = {0};
128✔
2703
    SMqMetaRsp metaRsp = {0};
128✔
2704
    tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead));
128✔
2705
    if (tDecodeMqMetaRsp(&metaCoder, &metaRsp) < 0) {
128!
UNCOV
2706
      code = TSDB_CODE_INVALID_PARA;
×
UNCOV
2707
      goto end;
×
2708
    }
2709
    code = writeRawImpl(taos, metaRsp.metaRsp, metaRsp.metaRspLen, metaRsp.resMsgType);
128✔
2710
    tDeleteMqMetaRsp(&metaRsp);
128✔
2711
    if (code != TSDB_CODE_SUCCESS) {
128!
UNCOV
2712
      goto end;
×
2713
    }
2714
  }
2715

2716
end:
15✔
2717
  tDeleteMqBatchMetaRsp(&rsp);
15✔
2718
  return code;
15✔
2719
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc