• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4868

26 Nov 2025 05:46AM UTC coverage: 64.629% (+0.2%) from 64.473%
#4868

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

770 of 945 new or added lines in 33 files covered. (81.48%)

3010 existing lines in 120 files now uncovered.

158425 of 245129 relevant lines covered (64.63%)

113048242.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.28
/source/client/src/clientRawBlockWrite.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "parser.h"
19
#include "tcol.h"
20
#include "tcompression.h"
21
#include "tdatablock.h"
22
#include "tdef.h"
23
#include "tglobal.h"
24
#include "tmsgtype.h"
25

26
#define RAW_NULL_CHECK(c) \
27
  do {                    \
28
    if (c == NULL) {      \
29
      code = terrno;      \
30
      goto end;           \
31
    }                     \
32
  } while (0)
33

34
#define RAW_FALSE_CHECK(c)           \
35
  do {                               \
36
    if (!c) {                        \
37
      code = TSDB_CODE_INVALID_PARA; \
38
      goto end;                      \
39
    }                                \
40
  } while (0)
41

42
#define RAW_RETURN_CHECK(c) \
43
  do {                      \
44
    code = c;               \
45
    if (code != 0) {        \
46
      goto end;             \
47
    }                       \
48
  } while (0)
49

50
#define LOG_ID_TAG   "connId:0x%" PRIx64 ", QID:0x%" PRIx64
51
#define LOG_ID_VALUE *(int64_t*)taos, pRequest->requestId
52

53
#define TMQ_META_VERSION "1.0"
54

55
static bool tmqAddJsonObjectItem(cJSON* object, const char* string, cJSON* item) {
315,981✔
56
  bool ret = cJSON_AddItemToObject(object, string, item);
315,981✔
57
  if (!ret) {
315,981✔
58
    cJSON_Delete(item);
×
59
  }
60
  return ret;
315,981✔
61
}
62
static bool tmqAddJsonArrayItem(cJSON* array, cJSON* item) {
56,899✔
63
  bool ret = cJSON_AddItemToArray(array, item);
56,899✔
64
  if (!ret) {
56,899✔
65
    cJSON_Delete(item);
×
66
  }
67
  return ret;
56,899✔
68
}
69

70
static int32_t  tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen);
71
static tb_uid_t processSuid(tb_uid_t suid, char* db) {
5,567✔
72
  if (db == NULL) {
5,567✔
73
    return suid;
×
74
  }
75
  return suid + MurmurHash3_32(db, strlen(db));
5,567✔
76
}
77
static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, SExtSchema* pExtSchemas, char* name, int64_t id, int8_t t,
4,582✔
78
                                 SColCmprWrapper* pColCmprRow, cJSON** pJson) {
79
  if (schemaRow == NULL || name == NULL || pColCmprRow == NULL || pJson == NULL) {
4,582✔
80
    uError("invalid parameter, schemaRow:%p, name:%p, pColCmprRow:%p, pJson:%p", schemaRow, name, pColCmprRow, pJson);
×
81
    return;
×
82
  }
83
  int32_t code = TSDB_CODE_SUCCESS;
4,582✔
84
  int8_t  buildDefaultCompress = 0;
4,582✔
85
  if (pColCmprRow->nCols <= 0) {
4,582✔
86
    buildDefaultCompress = 1;
70✔
87
  }
88

89
  char*  string = NULL;
4,582✔
90
  cJSON* json = cJSON_CreateObject();
4,582✔
91
  RAW_NULL_CHECK(json);
4,582✔
92
  cJSON* type = cJSON_CreateString("create");
4,582✔
93
  RAW_NULL_CHECK(type);
4,582✔
94

95
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
4,582✔
96
  cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super");
4,582✔
97
  RAW_NULL_CHECK(tableType);
4,582✔
98
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
4,582✔
99
  cJSON* tableName = cJSON_CreateString(name);
4,582✔
100
  RAW_NULL_CHECK(tableName);
4,582✔
101
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
4,582✔
102

103
  cJSON* columns = cJSON_CreateArray();
4,582✔
104
  RAW_NULL_CHECK(columns);
4,582✔
105
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "columns", columns));
4,582✔
106

107
  for (int i = 0; i < schemaRow->nCols; i++) {
30,714✔
108
    cJSON* column = cJSON_CreateObject();
26,132✔
109
    RAW_NULL_CHECK(column);
26,132✔
110
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(columns, column));
26,132✔
111
    SSchema* s = schemaRow->pSchema + i;
26,132✔
112
    cJSON*   cname = cJSON_CreateString(s->name);
26,132✔
113
    RAW_NULL_CHECK(cname);
26,132✔
114
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "name", cname));
26,132✔
115
    cJSON* ctype = cJSON_CreateNumber(s->type);
26,132✔
116
    RAW_NULL_CHECK(ctype);
26,132✔
117
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "type", ctype));
26,132✔
118
    if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) {
28,536✔
119
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
2,404✔
120
      cJSON*  cbytes = cJSON_CreateNumber(length);
2,404✔
121
      RAW_NULL_CHECK(cbytes);
2,404✔
122
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "length", cbytes));
2,404✔
123
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
23,728✔
124
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
394✔
125
      cJSON*  cbytes = cJSON_CreateNumber(length);
394✔
126
      RAW_NULL_CHECK(cbytes);
394✔
127
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "length", cbytes));
394✔
128
    } else if (IS_STR_DATA_BLOB(s->type)) {
23,334✔
129
      int32_t length = s->bytes - BLOBSTR_HEADER_SIZE;
×
130
      cJSON*  cbytes = cJSON_CreateNumber(length);
×
131
      RAW_NULL_CHECK(cbytes);
×
132
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "length", cbytes));
×
133
    } else if (s->type == TSDB_DATA_TYPE_DECIMAL || s->type == TSDB_DATA_TYPE_DECIMAL64) {
23,334✔
134
      int32_t length = pExtSchemas[i].typeMod;
202✔
135
      cJSON*  cbytes = cJSON_CreateNumber(length);
202✔
136
      RAW_NULL_CHECK(cbytes);
202✔
137
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "length", cbytes));
202✔
138
    }
139
    cJSON* isPk = cJSON_CreateBool(s->flags & COL_IS_KEY);
26,132✔
140
    RAW_NULL_CHECK(isPk);
26,132✔
141
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "isPrimarykey", isPk));
26,132✔
142

143
    if (pColCmprRow == NULL) {
26,132✔
144
      continue;
×
145
    }
146

147
    uint32_t alg = 0;
26,132✔
148
    if (buildDefaultCompress) {
26,132✔
149
      alg = createDefaultColCmprByType(s->type);
140✔
150
    } else {
151
      SColCmpr* pColCmpr = pColCmprRow->pColCmpr + i;
25,992✔
152
      alg = pColCmpr->alg;
25,992✔
153
    }
154
    const char* encode = columnEncodeStr(COMPRESS_L1_TYPE_U32(alg));
26,132✔
155
    RAW_NULL_CHECK(encode);
26,132✔
156
    const char* compress = columnCompressStr(COMPRESS_L2_TYPE_U32(alg));
26,132✔
157
    RAW_NULL_CHECK(compress);
26,132✔
158
    const char* level = columnLevelStr(COMPRESS_L2_TYPE_LEVEL_U32(alg));
26,132✔
159
    RAW_NULL_CHECK(level);
26,132✔
160

161
    cJSON* encodeJson = cJSON_CreateString(encode);
26,132✔
162
    RAW_NULL_CHECK(encodeJson);
26,132✔
163
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "encode", encodeJson));
26,132✔
164

165
    cJSON* compressJson = cJSON_CreateString(compress);
26,132✔
166
    RAW_NULL_CHECK(compressJson);
26,132✔
167
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "compress", compressJson));
26,132✔
168

169
    cJSON* levelJson = cJSON_CreateString(level);
26,132✔
170
    RAW_NULL_CHECK(levelJson);
26,132✔
171
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "level", levelJson));
26,132✔
172
  }
173

174
  cJSON* tags = cJSON_CreateArray();
4,582✔
175
  RAW_NULL_CHECK(tags);
4,582✔
176
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags));
4,582✔
177

178
  for (int i = 0; schemaTag && i < schemaTag->nCols; i++) {
13,218✔
179
    cJSON* tag = cJSON_CreateObject();
8,636✔
180
    RAW_NULL_CHECK(tag);
8,636✔
181
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag));
8,636✔
182
    SSchema* s = schemaTag->pSchema + i;
8,636✔
183
    cJSON*   tname = cJSON_CreateString(s->name);
8,636✔
184
    RAW_NULL_CHECK(tname);
8,636✔
185
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname));
8,636✔
186
    cJSON* ttype = cJSON_CreateNumber(s->type);
8,636✔
187
    RAW_NULL_CHECK(ttype);
8,636✔
188
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype));
8,636✔
189
    if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) {
9,215✔
190
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
579✔
191
      cJSON*  cbytes = cJSON_CreateNumber(length);
579✔
192
      RAW_NULL_CHECK(cbytes);
579✔
193
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "length", cbytes));
579✔
194
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
8,057✔
195
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
2,404✔
196
      cJSON*  cbytes = cJSON_CreateNumber(length);
2,404✔
197
      RAW_NULL_CHECK(cbytes);
2,404✔
198
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "length", cbytes));
2,404✔
199
    } else if (IS_STR_DATA_BLOB(s->type)) {
5,653✔
200
      int32_t length = s->bytes - BLOBSTR_HEADER_SIZE;
×
201
      cJSON*  cbytes = cJSON_CreateNumber(length);
×
202
      RAW_NULL_CHECK(cbytes);
×
203
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "length", cbytes));
×
204
    }
205
  }
206

207
end:
4,582✔
208
  *pJson = json;
4,582✔
209
}
210

211
static int32_t setCompressOption(cJSON* json, uint32_t para) {
×
212
  if (json == NULL) {
×
213
    return TSDB_CODE_INVALID_PARA;
×
214
  }
215
  uint8_t encode = COMPRESS_L1_TYPE_U32(para);
×
216
  int32_t code = 0;
×
217
  if (encode != 0) {
×
218
    const char* encodeStr = columnEncodeStr(encode);
×
219
    RAW_NULL_CHECK(encodeStr);
×
220
    cJSON* encodeJson = cJSON_CreateString(encodeStr);
×
221
    RAW_NULL_CHECK(encodeJson);
×
222
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "encode", encodeJson));
×
223
    return code;
×
224
  }
225
  uint8_t compress = COMPRESS_L2_TYPE_U32(para);
×
226
  if (compress != 0) {
×
227
    const char* compressStr = columnCompressStr(compress);
×
228
    RAW_NULL_CHECK(compressStr);
×
229
    cJSON* compressJson = cJSON_CreateString(compressStr);
×
230
    RAW_NULL_CHECK(compressJson);
×
231
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "compress", compressJson));
×
232
    return code;
×
233
  }
234
  uint8_t level = COMPRESS_L2_TYPE_LEVEL_U32(para);
×
235
  if (level != 0) {
×
236
    const char* levelStr = columnLevelStr(level);
×
237
    RAW_NULL_CHECK(levelStr);
×
238
    cJSON* levelJson = cJSON_CreateString(levelStr);
×
239
    RAW_NULL_CHECK(levelJson);
×
240
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "level", levelJson));
×
241
    return code;
×
242
  }
243

244
end:
×
245
  return code;
×
246
}
247
static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON** pJson) {
2,105✔
248
  if (alterData == NULL || pJson == NULL) {
2,105✔
249
    uError("invalid parameter in %s", __func__);
×
250
    return;
×
251
  }
252
  SMAlterStbReq req = {0};
2,105✔
253
  cJSON*        json = NULL;
2,105✔
254
  char*         string = NULL;
2,105✔
255
  int32_t       code = 0;
2,105✔
256

257
  if (tDeserializeSMAlterStbReq(alterData, alterDataLen, &req) != 0) {
2,105✔
258
    goto end;
×
259
  }
260

261
  json = cJSON_CreateObject();
2,105✔
262
  RAW_NULL_CHECK(json);
2,105✔
263
  cJSON* type = cJSON_CreateString("alter");
2,105✔
264
  RAW_NULL_CHECK(type);
2,105✔
265
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
2,105✔
266
  SName name = {0};
2,105✔
267
  RAW_RETURN_CHECK(tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
2,105✔
268
  cJSON* tableType = cJSON_CreateString("super");
2,105✔
269
  RAW_NULL_CHECK(tableType);
2,105✔
270
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
2,105✔
271
  cJSON* tableName = cJSON_CreateString(name.tname);
2,105✔
272
  RAW_NULL_CHECK(tableName);
2,105✔
273
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
2,105✔
274

275
  cJSON* alterType = cJSON_CreateNumber(req.alterType);
2,105✔
276
  RAW_NULL_CHECK(alterType);
2,105✔
277
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "alterType", alterType));
2,105✔
278
  switch (req.alterType) {
2,105✔
279
    case TSDB_ALTER_TABLE_ADD_TAG:
1,263✔
280
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
281
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
1,263✔
282
      RAW_NULL_CHECK(field);
1,263✔
283
      cJSON* colName = cJSON_CreateString(field->name);
1,263✔
284
      RAW_NULL_CHECK(colName);
1,263✔
285
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
1,263✔
286
      cJSON* colType = cJSON_CreateNumber(field->type);
1,263✔
287
      RAW_NULL_CHECK(colType);
1,263✔
288
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
1,263✔
289

290
      if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
1,263✔
291
          field->type == TSDB_DATA_TYPE_GEOMETRY) {
1,263✔
292
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
421✔
293
        cJSON*  cbytes = cJSON_CreateNumber(length);
421✔
294
        RAW_NULL_CHECK(cbytes);
421✔
295
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
421✔
296
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
842✔
297
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
298
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
299
        RAW_NULL_CHECK(cbytes);
×
300
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
301
      } else if (IS_STR_DATA_BLOB(field->type)) {
842✔
302
        int32_t length = field->bytes - BLOBSTR_HEADER_SIZE;
×
303
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
304
        RAW_NULL_CHECK(cbytes);
×
305
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
306
      }
307
      break;
1,263✔
308
    }
309
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: {
×
310
      SFieldWithOptions* field = taosArrayGet(req.pFields, 0);
×
311
      RAW_NULL_CHECK(field);
×
312
      cJSON* colName = cJSON_CreateString(field->name);
×
313
      RAW_NULL_CHECK(colName);
×
314
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
315
      cJSON* colType = cJSON_CreateNumber(field->type);
×
316
      RAW_NULL_CHECK(colType);
×
317
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
×
318

319
      if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
×
320
          field->type == TSDB_DATA_TYPE_GEOMETRY) {
×
321
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
×
322
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
323
        RAW_NULL_CHECK(cbytes);
×
324
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
325
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
×
326
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
327
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
328
        RAW_NULL_CHECK(cbytes);
×
329
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
330
      } else if (IS_STR_DATA_BLOB(field->type)) {
×
331
        int32_t length = field->bytes - BLOBSTR_HEADER_SIZE;
×
332
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
333
        RAW_NULL_CHECK(cbytes);
×
334
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
335
      }
336

337
      RAW_RETURN_CHECK(setCompressOption(json, field->compress));
×
338
      break;
×
339
    }
340
    case TSDB_ALTER_TABLE_DROP_TAG:
421✔
341
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
342
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
421✔
343
      RAW_NULL_CHECK(field);
421✔
344
      cJSON* colName = cJSON_CreateString(field->name);
421✔
345
      RAW_NULL_CHECK(colName);
421✔
346
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
421✔
347
      break;
421✔
348
    }
349
    case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
421✔
350
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
351
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
421✔
352
      RAW_NULL_CHECK(field);
421✔
353
      cJSON* colName = cJSON_CreateString(field->name);
421✔
354
      RAW_NULL_CHECK(colName);
421✔
355
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
421✔
356
      cJSON* colType = cJSON_CreateNumber(field->type);
421✔
357
      RAW_NULL_CHECK(colType);
421✔
358
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
421✔
359
      if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
421✔
360
          field->type == TSDB_DATA_TYPE_GEOMETRY) {
421✔
361
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
421✔
362
        cJSON*  cbytes = cJSON_CreateNumber(length);
421✔
363
        RAW_NULL_CHECK(cbytes);
421✔
364
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
421✔
365
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
×
366
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
367
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
368
        RAW_NULL_CHECK(cbytes);
×
369
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
370
      }
371
      break;
421✔
372
    }
373
    case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
×
374
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
375
      TAOS_FIELD* oldField = taosArrayGet(req.pFields, 0);
×
376
      RAW_NULL_CHECK(oldField);
×
377
      TAOS_FIELD* newField = taosArrayGet(req.pFields, 1);
×
378
      RAW_NULL_CHECK(newField);
×
379
      cJSON* colName = cJSON_CreateString(oldField->name);
×
380
      RAW_NULL_CHECK(colName);
×
381
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
382
      cJSON* colNewName = cJSON_CreateString(newField->name);
×
383
      RAW_NULL_CHECK(colNewName);
×
384
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colNewName", colNewName));
×
385
      break;
×
386
    }
387
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
×
388
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
×
389
      RAW_NULL_CHECK(field);
×
390
      cJSON* colName = cJSON_CreateString(field->name);
×
391
      RAW_NULL_CHECK(colName);
×
392
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
393
      RAW_RETURN_CHECK(setCompressOption(json, field->bytes));
×
394
      break;
×
395
    }
396
    default:
×
397
      break;
×
398
  }
399

400
end:
2,105✔
401
  tFreeSMAltertbReq(&req);
2,105✔
402
  *pJson = json;
2,105✔
403
}
404

405
static void processCreateStb(SMqMetaRsp* metaRsp, cJSON** pJson) {
3,593✔
406
  if (metaRsp == NULL || pJson == NULL) {
3,593✔
407
    uError("invalid parameter in %s", __func__);
×
408
    return;
×
409
  }
410
  SVCreateStbReq req = {0};
3,593✔
411
  SDecoder       coder = {0};
3,593✔
412

413
  uDebug("create stable data:%p", metaRsp);
3,593✔
414
  // decode and process req
415
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
3,593✔
416
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
3,593✔
417
  tDecoderInit(&coder, data, len);
3,593✔
418

419
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
3,593✔
420
    goto end;
×
421
  }
422
  buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.pExtSchemas, req.name, req.suid, TSDB_SUPER_TABLE, &req.colCmpr, pJson);
3,593✔
423

424
end:
3,593✔
425
  uDebug("create stable return");
3,593✔
426
  tDecoderClear(&coder);
3,593✔
427
}
428

429
static void processAlterStb(SMqMetaRsp* metaRsp, cJSON** pJson) {
2,105✔
430
  if (metaRsp == NULL || pJson == NULL) {
2,105✔
431
    uError("invalid parameter in %s", __func__);
×
432
    return;
×
433
  }
434
  SVCreateStbReq req = {0};
2,105✔
435
  SDecoder       coder = {0};
2,105✔
436
  uDebug("alter stable data:%p", metaRsp);
2,105✔
437

438
  // decode and process req
439
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
2,105✔
440
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
2,105✔
441
  tDecoderInit(&coder, data, len);
2,105✔
442

443
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
2,105✔
444
    goto end;
×
445
  }
446
  buildAlterSTableJson(req.alterOriData, req.alterOriDataLen, pJson);
2,105✔
447

448
end:
2,105✔
449
  uDebug("alter stable return");
2,105✔
450
  tDecoderClear(&coder);
2,105✔
451
}
452

453
static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
6,926✔
454
  if (json == NULL || pCreateReq == NULL) {
6,926✔
455
    uError("invalid parameter in %s", __func__);
×
456
    return;
×
457
  }
458
  STag*   pTag = (STag*)pCreateReq->ctb.pTag;
6,926✔
459
  char*   sname = pCreateReq->ctb.stbName;
6,926✔
460
  char*   name = pCreateReq->name;
6,926✔
461
  SArray* tagName = pCreateReq->ctb.tagName;
6,926✔
462
  int64_t id = pCreateReq->uid;
6,926✔
463
  uint8_t tagNum = pCreateReq->ctb.tagNum;
6,926✔
464
  int32_t code = 0;
6,926✔
465
  SArray* pTagVals = NULL;
6,926✔
466
  char*   pJson = NULL;
6,926✔
467

468
  cJSON* tableName = cJSON_CreateString(name);
6,926✔
469
  RAW_NULL_CHECK(tableName);
6,926✔
470
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
6,926✔
471
  cJSON* using = cJSON_CreateString(sname);
6,926✔
472
  RAW_NULL_CHECK(using);
6,926✔
473
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "using", using));
6,926✔
474
  cJSON* tagNumJson = cJSON_CreateNumber(tagNum);
6,926✔
475
  RAW_NULL_CHECK(tagNumJson);
6,926✔
476
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tagNum", tagNumJson));
6,926✔
477

478
  cJSON* tags = cJSON_CreateArray();
6,926✔
479
  RAW_NULL_CHECK(tags);
6,926✔
480
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags));
6,926✔
481
  RAW_RETURN_CHECK(tTagToValArray(pTag, &pTagVals));
6,926✔
482
  if (tTagIsJson(pTag)) {
6,926✔
483
    STag* p = (STag*)pTag;
910✔
484
    if (p->nTag == 0) {
910✔
485
      uError("p->nTag == 0");
455✔
486
      goto end;
455✔
487
    }
488
    parseTagDatatoJson(pTag, &pJson, NULL);
455✔
489
    RAW_NULL_CHECK(pJson);
455✔
490
    cJSON* tag = cJSON_CreateObject();
455✔
491
    RAW_NULL_CHECK(tag);
455✔
492
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag));
455✔
493
    STagVal* pTagVal = taosArrayGet(pTagVals, 0);
455✔
494
    RAW_NULL_CHECK(pTagVal);
455✔
495
    char* ptname = taosArrayGet(tagName, 0);
455✔
496
    RAW_NULL_CHECK(ptname);
455✔
497
    cJSON* tname = cJSON_CreateString(ptname);
455✔
498
    RAW_NULL_CHECK(tname);
455✔
499
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname));
455✔
500
    cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON);
455✔
501
    RAW_NULL_CHECK(ttype);
455✔
502
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype));
455✔
503
    cJSON* tvalue = cJSON_CreateString(pJson);
455✔
504
    RAW_NULL_CHECK(tvalue);
455✔
505
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "value", tvalue));
455✔
506
    goto end;
455✔
507
  }
508

509
  for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
20,630✔
510
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
14,614✔
511
    RAW_NULL_CHECK(pTagVal);
14,614✔
512
    cJSON* tag = cJSON_CreateObject();
14,614✔
513
    RAW_NULL_CHECK(tag);
14,614✔
514
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag));
14,614✔
515
    char* ptname = taosArrayGet(tagName, i);
14,614✔
516
    RAW_NULL_CHECK(ptname);
14,614✔
517
    cJSON* tname = cJSON_CreateString(ptname);
14,614✔
518
    RAW_NULL_CHECK(tname);
14,614✔
519
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname));
14,614✔
520
    cJSON* ttype = cJSON_CreateNumber(pTagVal->type);
14,614✔
521
    RAW_NULL_CHECK(ttype);
14,614✔
522
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype));
14,614✔
523

524
    cJSON* tvalue = NULL;
14,614✔
525
    if (IS_VAR_DATA_TYPE(pTagVal->type)) {
19,314✔
526
      if (IS_STR_DATA_BLOB(pTagVal->type)) {
4,700✔
527
        goto end;
×
528
      }
529
      int64_t bufSize = 0;
4,700✔
530
      if (pTagVal->type == TSDB_DATA_TYPE_VARBINARY) {
4,700✔
531
        bufSize = pTagVal->nData * 2 + 2 + 3;
×
532
      } else {
533
        bufSize = pTagVal->nData + 3;
4,700✔
534
      }
535
      char* buf = taosMemoryCalloc(bufSize, 1);
4,700✔
536
      RAW_NULL_CHECK(buf);
4,700✔
537
      if (dataConverToStr(buf, bufSize, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL) != TSDB_CODE_SUCCESS) {
4,700✔
538
        taosMemoryFree(buf);
×
539
        goto end;
×
540
      }
541

542
      tvalue = cJSON_CreateString(buf);
4,700✔
543
      taosMemoryFree(buf);
4,700✔
544
      RAW_NULL_CHECK(tvalue);
4,700✔
545
    } else {
546
      double val = 0;
9,914✔
547
      GET_TYPED_DATA(val, double, pTagVal->type, &pTagVal->i64,
9,914✔
548
                     0);  // currently tag type can't be decimal, so pass 0 as typeMod
549
      tvalue = cJSON_CreateNumber(val);
9,914✔
550
      RAW_NULL_CHECK(tvalue);
9,914✔
551
    }
552

553
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "value", tvalue));
14,614✔
554
  }
555

556
end:
6,926✔
557
  taosMemoryFree(pJson);
6,926✔
558
  taosArrayDestroy(pTagVals);
6,926✔
559
}
560

561
static void buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs, cJSON** pJson) {
5,268✔
562
  if (pJson == NULL || pCreateReq == NULL) {
5,268✔
563
    uError("invalid parameter in %s", __func__);
×
564
    return;
×
565
  }
566
  int32_t code = 0;
5,268✔
567
  char*   string = NULL;
5,268✔
568
  cJSON*  json = cJSON_CreateObject();
5,268✔
569
  RAW_NULL_CHECK(json);
5,268✔
570
  cJSON* type = cJSON_CreateString("create");
5,268✔
571
  RAW_NULL_CHECK(type);
5,268✔
572
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
5,268✔
573

574
  cJSON* tableType = cJSON_CreateString("child");
5,268✔
575
  RAW_NULL_CHECK(tableType);
5,268✔
576
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
5,268✔
577

578
  buildChildElement(json, pCreateReq);
5,268✔
579
  cJSON* createList = cJSON_CreateArray();
5,268✔
580
  RAW_NULL_CHECK(createList);
5,268✔
581
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "createList", createList));
5,268✔
582

583
  for (int i = 0; nReqs > 1 && i < nReqs; i++) {
6,926✔
584
    cJSON* create = cJSON_CreateObject();
1,658✔
585
    RAW_NULL_CHECK(create);
1,658✔
586
    buildChildElement(create, pCreateReq + i);
1,658✔
587
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(createList, create));
1,658✔
588
  }
589

590
end:
5,268✔
591
  *pJson = json;
5,268✔
592
}
593

594
static void processCreateTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
5,863✔
595
  if (pJson == NULL || metaRsp == NULL) {
5,863✔
596
    uError("invalid parameter in %s", __func__);
×
597
    return;
×
598
  }
599
  SDecoder           decoder = {0};
5,863✔
600
  SVCreateTbBatchReq req = {0};
5,863✔
601
  SVCreateTbReq*     pCreateReq;
602
  // decode
603
  uDebug("create table data:%p", metaRsp);
5,863✔
604
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
5,863✔
605
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
5,863✔
606
  tDecoderInit(&decoder, data, len);
5,863✔
607
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
5,863✔
608
    goto end;
×
609
  }
610

611
  // loop to create table
612
  if (req.nReqs > 0) {
5,863✔
613
    pCreateReq = req.pReqs;
5,863✔
614
    if (pCreateReq->type == TSDB_CHILD_TABLE) {
5,863✔
615
      buildCreateCTableJson(req.pReqs, req.nReqs, pJson);
4,944✔
616
    } else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
919✔
617
      buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->pExtSchemas, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE,
919✔
618
                           &pCreateReq->colCmpr, pJson);
619
    }
620
  }
621

622
end:
5,863✔
623
  uDebug("create table return");
5,863✔
624
  tDeleteSVCreateTbBatchReq(&req);
5,863✔
625
  tDecoderClear(&decoder);
5,863✔
626
}
627

628
static void processAutoCreateTable(SMqDataRsp* rsp, char** string) {
394✔
629
  if (rsp == NULL || string == NULL) {
394✔
630
    uError("invalid parameter in %s", __func__);
×
631
    return;
×
632
  }
633
  SDecoder*      decoder = NULL;
394✔
634
  SVCreateTbReq* pCreateReq = NULL;
394✔
635
  int32_t        code = 0;
394✔
636
  uDebug("auto create table data:%p", rsp);
394✔
637
  if (rsp->createTableNum <= 0) {
394✔
638
    uError("processAutoCreateTable rsp->createTableNum <= 0");
×
639
    goto end;
×
640
  }
641

642
  decoder = taosMemoryCalloc(rsp->createTableNum, sizeof(SDecoder));
394✔
643
  RAW_NULL_CHECK(decoder);
394✔
644
  pCreateReq = taosMemoryCalloc(rsp->createTableNum, sizeof(SVCreateTbReq));
394✔
645
  RAW_NULL_CHECK(pCreateReq);
394✔
646

647
  // loop to create table
648
  for (int32_t iReq = 0; iReq < rsp->createTableNum; iReq++) {
1,179✔
649
    // decode
650
    void** data = taosArrayGet(rsp->createTableReq, iReq);
785✔
651
    RAW_NULL_CHECK(data);
785✔
652
    int32_t* len = taosArrayGet(rsp->createTableLen, iReq);
785✔
653
    RAW_NULL_CHECK(len);
785✔
654
    tDecoderInit(&decoder[iReq], *data, *len);
785✔
655
    if (tDecodeSVCreateTbReq(&decoder[iReq], pCreateReq + iReq) < 0) {
785✔
656
      goto end;
×
657
    }
658

659
    if (pCreateReq[iReq].type != TSDB_CHILD_TABLE && pCreateReq[iReq].type != TSDB_NORMAL_TABLE) {
785✔
660
      uError("processAutoCreateTable pCreateReq[iReq].type != TSDB_CHILD_TABLE");
×
661
      goto end;
×
662
    }
663
  }
664
  cJSON* pJson = NULL;
394✔
665
  if (pCreateReq->type == TSDB_NORMAL_TABLE) {
394✔
666
    buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->pExtSchemas, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE,
70✔
667
                         &pCreateReq->colCmpr, &pJson);
668
  } else if (pCreateReq->type == TSDB_CHILD_TABLE) {
324✔
669
    buildCreateCTableJson(pCreateReq, rsp->createTableNum, &pJson);
324✔
670
  }
671

672
  *string = cJSON_PrintUnformatted(pJson);
394✔
673
  cJSON_Delete(pJson);
394✔
674

675
end:
394✔
676
  uDebug("auto created table return, sql json:%s", *string);
394✔
677
  for (int i = 0; decoder && pCreateReq && i < rsp->createTableNum; i++) {
1,179✔
678
    tDecoderClear(&decoder[i]);
785✔
679
    taosMemoryFreeClear(pCreateReq[i].comment);
785✔
680
    if (pCreateReq[i].type == TSDB_CHILD_TABLE) {
785✔
681
      taosArrayDestroy(pCreateReq[i].ctb.tagName);
715✔
682
    }
683
  }
684
  taosMemoryFree(decoder);
394✔
685
  taosMemoryFree(pCreateReq);
394✔
686
}
687

688
static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
1,453✔
689
  if (pJson == NULL || metaRsp == NULL) {
1,453✔
690
    uError("invalid parameter in %s", __func__);
×
691
    return;
×
692
  }
693
  SDecoder     decoder = {0};
1,453✔
694
  SVAlterTbReq vAlterTbReq = {0};
1,453✔
695
  char*        string = NULL;
1,453✔
696
  cJSON*       json = NULL;
1,453✔
697
  int32_t      code = 0;
1,453✔
698

699
  uDebug("alter table data:%p", metaRsp);
1,453✔
700
  // decode
701
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
1,453✔
702
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
1,453✔
703
  tDecoderInit(&decoder, data, len);
1,453✔
704
  if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) {
1,453✔
705
    uError("tDecodeSVAlterTbReq error");
×
706
    goto end;
×
707
  }
708

709
  json = cJSON_CreateObject();
1,453✔
710
  RAW_NULL_CHECK(json);
1,453✔
711
  cJSON* type = cJSON_CreateString("alter");
1,453✔
712
  RAW_NULL_CHECK(type);
1,453✔
713
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
1,453✔
714
  cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ||
2,608✔
715
                                                vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL
1,155✔
716
                                            ? "child"
717
                                            : "normal");
718
  RAW_NULL_CHECK(tableType);
1,453✔
719
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
1,453✔
720
  cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
1,453✔
721
  RAW_NULL_CHECK(tableName);
1,453✔
722
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
1,453✔
723
  cJSON* alterType = cJSON_CreateNumber(vAlterTbReq.action);
1,453✔
724
  RAW_NULL_CHECK(alterType);
1,453✔
725
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "alterType", alterType));
1,453✔
726

727
  switch (vAlterTbReq.action) {
1,453✔
728
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
231✔
729
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
231✔
730
      RAW_NULL_CHECK(colName);
231✔
731
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
231✔
732
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
231✔
733
      RAW_NULL_CHECK(colType);
231✔
734
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
231✔
735

736
      if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY ||
231✔
737
          vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) {
231✔
738
        int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
×
739
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
740
        RAW_NULL_CHECK(cbytes);
×
741
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
742
      } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
231✔
743
        int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
744
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
745
        RAW_NULL_CHECK(cbytes);
×
746
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
747
      }
748
      break;
231✔
749
    }
750
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: {
×
751
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
×
752
      RAW_NULL_CHECK(colName);
×
753
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
754
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
×
755
      RAW_NULL_CHECK(colType);
×
756
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
×
757

758
      if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY ||
×
759
          vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) {
×
760
        int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
×
761
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
762
        RAW_NULL_CHECK(cbytes);
×
763
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
764
      } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
×
765
        int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
766
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
767
        RAW_NULL_CHECK(cbytes);
×
768
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
769
      }
770
      RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
×
771
      break;
×
772
    }
773
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
231✔
774
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
231✔
775
      RAW_NULL_CHECK(colName);
231✔
776
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
231✔
777
      break;
231✔
778
    }
779
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
231✔
780
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
231✔
781
      RAW_NULL_CHECK(colName);
231✔
782
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
231✔
783
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.colModType);
231✔
784
      RAW_NULL_CHECK(colType);
231✔
785
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
231✔
786
      if (vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY || vAlterTbReq.colModType == TSDB_DATA_TYPE_VARBINARY ||
231✔
787
          vAlterTbReq.colModType == TSDB_DATA_TYPE_GEOMETRY) {
231✔
788
        int32_t length = vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE;
×
789
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
790
        RAW_NULL_CHECK(cbytes);
×
791
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
792
      } else if (vAlterTbReq.colModType == TSDB_DATA_TYPE_NCHAR) {
231✔
793
        int32_t length = (vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
231✔
794
        cJSON*  cbytes = cJSON_CreateNumber(length);
231✔
795
        RAW_NULL_CHECK(cbytes);
231✔
796
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
231✔
797
      }
798
      break;
231✔
799
    }
800
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
231✔
801
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
231✔
802
      RAW_NULL_CHECK(colName);
231✔
803
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
231✔
804
      cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName);
231✔
805
      RAW_NULL_CHECK(colNewName);
231✔
806
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colNewName", colNewName));
231✔
807
      break;
231✔
808
    }
809
    case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: {
298✔
810
      cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName);
298✔
811
      RAW_NULL_CHECK(tagName);
298✔
812
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", tagName));
298✔
813

814
      bool isNull = vAlterTbReq.isNull;
298✔
815
      if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
298✔
816
        STag* jsonTag = (STag*)vAlterTbReq.pTagVal;
×
817
        if (jsonTag->nTag == 0) isNull = true;
×
818
      }
819
      if (!isNull) {
298✔
820
        char* buf = NULL;
298✔
821

822
        if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
298✔
823
          if (!tTagIsJson(vAlterTbReq.pTagVal)) {
×
824
            uError("processAlterTable isJson false");
×
825
            goto end;
×
826
          }
827
          parseTagDatatoJson(vAlterTbReq.pTagVal, &buf, NULL);
×
828
          if (buf == NULL) {
×
829
            uError("parseTagDatatoJson failed, buf == NULL");
×
830
            goto end;
×
831
          }
832
        } else {
833
          int64_t bufSize = 0;
298✔
834
          if (vAlterTbReq.tagType == TSDB_DATA_TYPE_VARBINARY) {
298✔
835
            bufSize = vAlterTbReq.nTagVal * 2 + 2 + 3;
×
836
          } else {
837
            bufSize = vAlterTbReq.nTagVal + 32;
298✔
838
          }
839
          buf = taosMemoryCalloc(bufSize, 1);
298✔
840
          RAW_NULL_CHECK(buf);
298✔
841
          if (dataConverToStr(buf, bufSize, vAlterTbReq.tagType, vAlterTbReq.pTagVal, vAlterTbReq.nTagVal, NULL) !=
298✔
842
              TSDB_CODE_SUCCESS) {
843
            taosMemoryFree(buf);
×
844
            goto end;
×
845
          }
846
        }
847

848
        cJSON* colValue = cJSON_CreateString(buf);
298✔
849
        taosMemoryFree(buf);
298✔
850
        RAW_NULL_CHECK(colValue);
298✔
851
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colValue", colValue));
298✔
852
      }
853

854
      cJSON* isNullCJson = cJSON_CreateBool(isNull);
298✔
855
      RAW_NULL_CHECK(isNullCJson);
298✔
856
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colValueNull", isNullCJson));
298✔
857
      break;
298✔
858
    }
859
    case TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL: {
×
860
      int32_t nTags = taosArrayGetSize(vAlterTbReq.pMultiTag);
×
861
      if (nTags <= 0) {
×
862
        uError("processAlterTable parse multi tags error");
×
863
        goto end;
×
864
      }
865

866
      cJSON* tags = cJSON_CreateArray();
×
867
      RAW_NULL_CHECK(tags);
×
868
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags));
×
869

870
      for (int32_t i = 0; i < nTags; i++) {
×
871
        cJSON* member = cJSON_CreateObject();
×
872
        RAW_NULL_CHECK(member);
×
873
        RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, member));
×
874

875
        SMultiTagUpateVal* pTagVal = taosArrayGet(vAlterTbReq.pMultiTag, i);
×
876
        cJSON*             tagName = cJSON_CreateString(pTagVal->tagName);
×
877
        RAW_NULL_CHECK(tagName);
×
878
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colName", tagName));
×
879

880
        if (pTagVal->tagType == TSDB_DATA_TYPE_JSON) {
×
881
          uError("processAlterTable isJson false");
×
882
          goto end;
×
883
        }
884
        bool isNull = pTagVal->isNull;
×
885
        if (!isNull) {
×
886
          int64_t bufSize = 0;
×
887
          if (pTagVal->tagType == TSDB_DATA_TYPE_VARBINARY) {
×
888
            bufSize = pTagVal->nTagVal * 2 + 2 + 3;
×
889
          } else {
890
            bufSize = pTagVal->nTagVal + 3;
×
891
          }
892
          char* buf = taosMemoryCalloc(bufSize, 1);
×
893
          RAW_NULL_CHECK(buf);
×
894
          if (dataConverToStr(buf, bufSize, pTagVal->tagType, pTagVal->pTagVal, pTagVal->nTagVal, NULL) !=
×
895
              TSDB_CODE_SUCCESS) {
896
            taosMemoryFree(buf);
×
897
            goto end;
×
898
          }
899
          cJSON* colValue = cJSON_CreateString(buf);
×
900
          taosMemoryFree(buf);
×
901
          RAW_NULL_CHECK(colValue);
×
902
          RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colValue", colValue));
×
903
        }
904
        cJSON* isNullCJson = cJSON_CreateBool(isNull);
×
905
        RAW_NULL_CHECK(isNullCJson);
×
906
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colValueNull", isNullCJson));
×
907
      }
908
      break;
×
909
    }
910

911
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
×
912
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
×
913
      RAW_NULL_CHECK(colName);
×
914
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
915
      RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
×
916
      break;
×
917
    }
918
    default:
231✔
919
      break;
231✔
920
  }
921

922
end:
1,453✔
923
  uDebug("alter table return");
1,453✔
924
  if (vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL) {
1,453✔
925
    taosArrayDestroy(vAlterTbReq.pMultiTag);
×
926
  }
927
  tDecoderClear(&decoder);
1,453✔
928
  *pJson = json;
1,453✔
929
}
930

931
static void processDropSTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
384✔
932
  if (pJson == NULL || metaRsp == NULL) {
384✔
933
    uError("invalid parameter in %s", __func__);
×
934
    return;
×
935
  }
936
  SDecoder     decoder = {0};
384✔
937
  SVDropStbReq req = {0};
384✔
938
  cJSON*       json = NULL;
384✔
939
  int32_t      code = 0;
384✔
940

941
  uDebug("processDropSTable data:%p", metaRsp);
384✔
942

943
  // decode
944
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
384✔
945
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
384✔
946
  tDecoderInit(&decoder, data, len);
384✔
947
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
384✔
948
    uError("tDecodeSVDropStbReq failed");
×
949
    goto end;
×
950
  }
951

952
  json = cJSON_CreateObject();
384✔
953
  RAW_NULL_CHECK(json);
384✔
954
  cJSON* type = cJSON_CreateString("drop");
384✔
955
  RAW_NULL_CHECK(type);
384✔
956
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
384✔
957
  cJSON* tableType = cJSON_CreateString("super");
384✔
958
  RAW_NULL_CHECK(tableType);
384✔
959
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
384✔
960
  cJSON* tableName = cJSON_CreateString(req.name);
384✔
961
  RAW_NULL_CHECK(tableName);
384✔
962
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
384✔
963

964
end:
384✔
965
  uDebug("processDropSTable return");
384✔
966
  tDecoderClear(&decoder);
384✔
967
  *pJson = json;
384✔
968
}
969
static void processDeleteTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
162✔
970
  if (pJson == NULL || metaRsp == NULL) {
162✔
971
    uError("invalid parameter in %s", __func__);
×
972
    return;
×
973
  }
974
  SDeleteRes req = {0};
162✔
975
  SDecoder   coder = {0};
162✔
976
  cJSON*     json = NULL;
162✔
977
  int32_t    code = 0;
162✔
978

979
  uDebug("processDeleteTable data:%p", metaRsp);
162✔
980
  // decode and process req
981
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
162✔
982
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
162✔
983

984
  tDecoderInit(&coder, data, len);
162✔
985
  if (tDecodeDeleteRes(&coder, &req) < 0) {
162✔
986
    uError("tDecodeDeleteRes failed");
×
987
    goto end;
×
988
  }
989

990
  //  getTbName(req.tableFName);
991
  char sql[256] = {0};
162✔
992
  (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
162✔
993
                 req.tsColName, req.skey, req.tsColName, req.ekey);
994

995
  json = cJSON_CreateObject();
162✔
996
  RAW_NULL_CHECK(json);
162✔
997
  cJSON* type = cJSON_CreateString("delete");
162✔
998
  RAW_NULL_CHECK(type);
162✔
999
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
162✔
1000
  cJSON* sqlJson = cJSON_CreateString(sql);
162✔
1001
  RAW_NULL_CHECK(sqlJson);
162✔
1002
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "sql", sqlJson));
162✔
1003

1004
end:
162✔
1005
  uDebug("processDeleteTable return");
162✔
1006
  tDecoderClear(&coder);
162✔
1007
  *pJson = json;
162✔
1008
}
1009

1010
static void processDropTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
261✔
1011
  if (pJson == NULL || metaRsp == NULL) {
261✔
1012
    uError("invalid parameter in %s", __func__);
×
1013
    return;
×
1014
  }
1015
  SDecoder         decoder = {0};
261✔
1016
  SVDropTbBatchReq req = {0};
261✔
1017
  cJSON*           json = NULL;
261✔
1018
  int32_t          code = 0;
261✔
1019

1020
  uDebug("processDropTable data:%p", metaRsp);
261✔
1021
  // decode
1022
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
261✔
1023
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
261✔
1024
  tDecoderInit(&decoder, data, len);
261✔
1025
  if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) {
261✔
1026
    uError("tDecodeSVDropTbBatchReq failed");
×
1027
    goto end;
×
1028
  }
1029

1030
  json = cJSON_CreateObject();
261✔
1031
  RAW_NULL_CHECK(json);
261✔
1032
  cJSON* type = cJSON_CreateString("drop");
261✔
1033
  RAW_NULL_CHECK(type);
261✔
1034
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
261✔
1035
  cJSON* tableNameList = cJSON_CreateArray();
261✔
1036
  RAW_NULL_CHECK(tableNameList);
261✔
1037
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableNameList", tableNameList));
261✔
1038

1039
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
591✔
1040
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
330✔
1041
    cJSON*       tableName = cJSON_CreateString(pDropTbReq->name);
330✔
1042
    RAW_NULL_CHECK(tableName);
330✔
1043
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tableNameList, tableName));
330✔
1044
  }
1045

1046
end:
261✔
1047
  uDebug("processDropTable return");
261✔
1048
  tDecoderClear(&decoder);
261✔
1049
  *pJson = json;
261✔
1050
}
1051

1052
static int32_t taosCreateStb(TAOS* taos, void* meta, uint32_t metaLen) {
5,567✔
1053
  if (taos == NULL || meta == NULL) {
5,567✔
1054
    uError("invalid parameter in %s", __func__);
×
1055
    return TSDB_CODE_INVALID_PARA;
×
1056
  }
1057
  SVCreateStbReq req = {0};
5,567✔
1058
  SDecoder       coder = {0};
5,567✔
1059
  SMCreateStbReq pReq = {0};
5,567✔
1060
  int32_t        code = TSDB_CODE_SUCCESS;
5,567✔
1061
  SRequestObj*   pRequest = NULL;
5,567✔
1062

1063
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
5,567✔
1064
  uDebug(LOG_ID_TAG " create stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
5,567✔
1065
  pRequest->syncQuery = true;
5,567✔
1066
  if (!pRequest->pDb) {
5,567✔
1067
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1068
    goto end;
×
1069
  }
1070
  // decode and process req
1071
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
5,567✔
1072
  uint32_t len = metaLen - sizeof(SMsgHead);
5,567✔
1073
  tDecoderInit(&coder, data, len);
5,567✔
1074
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
5,567✔
1075
    code = TSDB_CODE_INVALID_PARA;
×
1076
    goto end;
×
1077
  }
1078

1079
  int8_t           createDefaultCompress = 0;
5,567✔
1080
  SColCmprWrapper* p = &req.colCmpr;
5,567✔
1081
  if (p->nCols == 0) {
5,567✔
1082
    createDefaultCompress = 1;
×
1083
  }
1084
  // build create stable
1085
  pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SFieldWithOptions));
5,567✔
1086
  RAW_NULL_CHECK(pReq.pColumns);
5,567✔
1087
  for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
38,714✔
1088
    SSchema*          pSchema = req.schemaRow.pSchema + i;
33,147✔
1089
    SFieldWithOptions field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
33,147✔
1090
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
33,147✔
1091

1092
    if (createDefaultCompress) {
33,147✔
1093
      field.compress = createDefaultColCmprByType(pSchema->type);
×
1094
    } else {
1095
      SColCmpr* pCmp = &req.colCmpr.pColCmpr[i];
33,147✔
1096
      field.compress = pCmp->alg;
33,147✔
1097
    }
1098
    if (req.pExtSchemas) field.typeMod = req.pExtSchemas[i].typeMod;
33,147✔
1099
    RAW_NULL_CHECK(taosArrayPush(pReq.pColumns, &field));
66,294✔
1100
  }
1101
  pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
5,567✔
1102
  RAW_NULL_CHECK(pReq.pTags);
5,567✔
1103
  for (int32_t i = 0; i < req.schemaTag.nCols; i++) {
20,808✔
1104
    SSchema* pSchema = req.schemaTag.pSchema + i;
15,241✔
1105
    SField   field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
15,241✔
1106
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
15,241✔
1107
    RAW_NULL_CHECK(taosArrayPush(pReq.pTags, &field));
30,482✔
1108
  }
1109

1110
  pReq.colVer = req.schemaRow.version;
5,567✔
1111
  pReq.tagVer = req.schemaTag.version;
5,567✔
1112
  pReq.numOfColumns = req.schemaRow.nCols;
5,567✔
1113
  pReq.numOfTags = req.schemaTag.nCols;
5,567✔
1114
  pReq.commentLen = -1;
5,567✔
1115
  pReq.suid = processSuid(req.suid, pRequest->pDb);
5,567✔
1116
  pReq.source = TD_REQ_FROM_TAOX;
5,567✔
1117
  pReq.igExists = true;
5,567✔
1118

1119
  uDebug(LOG_ID_TAG " create stable name:%s suid:%" PRId64 " processSuid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
5,567✔
1120
         pReq.suid);
1121
  STscObj* pTscObj = pRequest->pTscObj;
5,567✔
1122
  SName    tableName = {0};
5,567✔
1123
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
5,567✔
1124
  RAW_RETURN_CHECK(tNameExtractFullName(&tableName, pReq.name));
5,567✔
1125
  SCmdMsgInfo pCmdMsg = {0};
5,567✔
1126
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
5,567✔
1127
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
5,567✔
1128
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
5,567✔
1129
  if (pCmdMsg.msgLen <= 0) {
5,567✔
1130
    code = TSDB_CODE_INVALID_PARA;
×
1131
    goto end;
×
1132
  }
1133
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
5,567✔
1134
  RAW_NULL_CHECK(pCmdMsg.pMsg);
5,567✔
1135
  if (tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) <= 0) {
5,567✔
1136
    code = TSDB_CODE_INVALID_PARA;
×
1137
    taosMemoryFree(pCmdMsg.pMsg);
×
1138
    goto end;
×
1139
  }
1140

1141
  SQuery pQuery = {0};
5,567✔
1142
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
5,567✔
1143
  pQuery.pCmdMsg = &pCmdMsg;
5,567✔
1144
  pQuery.msgType = pQuery.pCmdMsg->msgType;
5,567✔
1145
  pQuery.stableQuery = true;
5,567✔
1146

1147
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
5,567✔
1148

1149
  taosMemoryFree(pCmdMsg.pMsg);
5,567✔
1150

1151
  if (pRequest->code == TSDB_CODE_SUCCESS) {
5,567✔
1152
    SCatalog* pCatalog = NULL;
5,567✔
1153
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
5,567✔
1154
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
5,567✔
1155
  }
1156

1157
  code = pRequest->code;
5,567✔
1158

1159
end:
5,567✔
1160
  uDebug(LOG_ID_TAG " create stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
5,567✔
1161
  destroyRequest(pRequest);
5,567✔
1162
  tFreeSMCreateStbReq(&pReq);
5,567✔
1163
  tDecoderClear(&coder);
5,567✔
1164
  return code;
5,567✔
1165
}
1166

1167
static int32_t taosDropStb(TAOS* taos, void* meta, uint32_t metaLen) {
384✔
1168
  if (taos == NULL || meta == NULL) {
384✔
1169
    uError("invalid parameter in %s", __func__);
×
1170
    return TSDB_CODE_INVALID_PARA;
×
1171
  }
1172
  SVDropStbReq req = {0};
384✔
1173
  SDecoder     coder = {0};
384✔
1174
  SMDropStbReq pReq = {0};
384✔
1175
  int32_t      code = TSDB_CODE_SUCCESS;
384✔
1176
  SRequestObj* pRequest = NULL;
384✔
1177

1178
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
384✔
1179
  uDebug(LOG_ID_TAG " drop stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
384✔
1180
  pRequest->syncQuery = true;
384✔
1181
  if (!pRequest->pDb) {
384✔
1182
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1183
    goto end;
×
1184
  }
1185
  // decode and process req
1186
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
384✔
1187
  uint32_t len = metaLen - sizeof(SMsgHead);
384✔
1188
  tDecoderInit(&coder, data, len);
384✔
1189
  if (tDecodeSVDropStbReq(&coder, &req) < 0) {
384✔
1190
    code = TSDB_CODE_INVALID_PARA;
×
1191
    goto end;
×
1192
  }
1193

1194
  SCatalog* pCatalog = NULL;
384✔
1195
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
384✔
1196
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
384✔
1197
                           .requestId = pRequest->requestId,
384✔
1198
                           .requestObjRefId = pRequest->self,
384✔
1199
                           .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
384✔
1200
  SName            pName = {0};
384✔
1201
  toName(pRequest->pTscObj->acctId, pRequest->pDb, req.name, &pName);
384✔
1202
  STableMeta* pTableMeta = NULL;
384✔
1203
  code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
384✔
1204
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
384✔
1205
    code = TSDB_CODE_SUCCESS;
82✔
1206
    taosMemoryFreeClear(pTableMeta);
82✔
1207
    goto end;
82✔
1208
  }
1209
  if (code != TSDB_CODE_SUCCESS) {
302✔
1210
    goto end;
×
1211
  }
1212
  pReq.suid = pTableMeta->uid;
302✔
1213
  taosMemoryFreeClear(pTableMeta);
302✔
1214

1215
  // build drop stable
1216
  pReq.igNotExists = true;
302✔
1217
  pReq.source = TD_REQ_FROM_TAOX;
302✔
1218
  //  pReq.suid = processSuid(req.suid, pRequest->pDb);
1219

1220
  uDebug(LOG_ID_TAG " drop stable name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
302✔
1221
         pReq.suid);
1222
  STscObj* pTscObj = pRequest->pTscObj;
302✔
1223
  SName    tableName = {0};
302✔
1224
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
302✔
1225
  if (tNameExtractFullName(&tableName, pReq.name) != 0) {
302✔
1226
    code = TSDB_CODE_INVALID_PARA;
×
1227
    goto end;
×
1228
  }
1229

1230
  SCmdMsgInfo pCmdMsg = {0};
302✔
1231
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
302✔
1232
  pCmdMsg.msgType = TDMT_MND_DROP_STB;
302✔
1233
  pCmdMsg.msgLen = tSerializeSMDropStbReq(NULL, 0, &pReq);
302✔
1234
  if (pCmdMsg.msgLen <= 0) {
302✔
1235
    code = TSDB_CODE_INVALID_PARA;
×
1236
    goto end;
×
1237
  }
1238
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
302✔
1239
  RAW_NULL_CHECK(pCmdMsg.pMsg);
302✔
1240
  if (tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) <= 0) {
302✔
1241
    code = TSDB_CODE_INVALID_PARA;
×
1242
    taosMemoryFree(pCmdMsg.pMsg);
×
1243
    goto end;
×
1244
  }
1245

1246
  SQuery pQuery = {0};
302✔
1247
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
302✔
1248
  pQuery.pCmdMsg = &pCmdMsg;
302✔
1249
  pQuery.msgType = pQuery.pCmdMsg->msgType;
302✔
1250
  pQuery.stableQuery = true;
302✔
1251

1252
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
302✔
1253
  taosMemoryFree(pCmdMsg.pMsg);
302✔
1254
  if (pRequest->code == TSDB_CODE_SUCCESS) {
302✔
1255
    // ignore the error code
1256
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
302✔
1257
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
302✔
1258
  }
1259

1260
  code = pRequest->code;
302✔
1261

1262
end:
384✔
1263
  uDebug(LOG_ID_TAG " drop stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
384✔
1264
  destroyRequest(pRequest);
384✔
1265
  tDecoderClear(&coder);
384✔
1266
  return code;
384✔
1267
}
1268

1269
typedef struct SVgroupCreateTableBatch {
1270
  SVCreateTbBatchReq req;
1271
  SVgroupInfo        info;
1272
  char               dbName[TSDB_DB_NAME_LEN];
1273
} SVgroupCreateTableBatch;
1274

1275
static void destroyCreateTbReqBatch(void* data) {
5,757✔
1276
  if (data == NULL) {
5,757✔
1277
    uError("invalid parameter in %s", __func__);
×
1278
    return;
×
1279
  }
1280
  SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data;
5,757✔
1281
  taosArrayDestroy(pTbBatch->req.pArray);
5,757✔
1282
}
1283

1284
static int32_t taosCreateTable(TAOS* taos, void* meta, uint32_t metaLen) {
5,662✔
1285
  if (taos == NULL || meta == NULL) {
5,662✔
1286
    uError("invalid parameter in %s", __func__);
×
1287
    return TSDB_CODE_INVALID_PARA;
×
1288
  }
1289
  SVCreateTbBatchReq req = {0};
5,662✔
1290
  SDecoder           coder = {0};
5,662✔
1291
  int32_t            code = TSDB_CODE_SUCCESS;
5,662✔
1292
  SRequestObj*       pRequest = NULL;
5,662✔
1293
  SQuery*            pQuery = NULL;
5,662✔
1294
  SHashObj*          pVgroupHashmap = NULL;
5,662✔
1295
  SArray*            pTagList = taosArrayInit(0, POINTER_BYTES);
5,662✔
1296
  RAW_NULL_CHECK(pTagList);
5,662✔
1297
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
5,662✔
1298
  uDebug(LOG_ID_TAG " create table, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
5,662✔
1299

1300
  pRequest->syncQuery = true;
5,662✔
1301
  if (!pRequest->pDb) {
5,662✔
1302
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1303
    goto end;
×
1304
  }
1305
  // decode and process req
1306
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
5,662✔
1307
  uint32_t len = metaLen - sizeof(SMsgHead);
5,662✔
1308
  tDecoderInit(&coder, data, len);
5,662✔
1309
  if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
5,662✔
1310
    code = TSDB_CODE_INVALID_PARA;
×
1311
    goto end;
×
1312
  }
1313

1314
  STscObj* pTscObj = pRequest->pTscObj;
5,662✔
1315

1316
  SVCreateTbReq* pCreateReq = NULL;
5,662✔
1317
  SCatalog*      pCatalog = NULL;
5,662✔
1318
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
5,662✔
1319
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
5,662✔
1320
  RAW_NULL_CHECK(pVgroupHashmap);
5,662✔
1321
  taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
5,662✔
1322

1323
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
5,662✔
1324
                           .requestId = pRequest->requestId,
5,662✔
1325
                           .requestObjRefId = pRequest->self,
5,662✔
1326
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
5,662✔
1327

1328
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
5,662✔
1329
  RAW_NULL_CHECK(pRequest->tableList);
5,662✔
1330
  // loop to create table
1331
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
12,129✔
1332
    pCreateReq = req.pReqs + iReq;
6,467✔
1333

1334
    SVgroupInfo pInfo = {0};
6,467✔
1335
    SName       pName = {0};
6,467✔
1336
    toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
6,467✔
1337
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
6,467✔
1338
    if (code != TSDB_CODE_SUCCESS) {
6,467✔
1339
      goto end;
×
1340
    }
1341

1342
    pCreateReq->flags |= TD_CREATE_IF_NOT_EXISTS;
6,467✔
1343
    // change tag cid to new cid
1344
    if (pCreateReq->type == TSDB_CHILD_TABLE) {
6,467✔
1345
      STableMeta* pTableMeta = NULL;
5,618✔
1346
      SName       sName = {0};
5,618✔
1347
      tb_uid_t    oldSuid = pCreateReq->ctb.suid;
5,618✔
1348
      //      pCreateReq->ctb.suid = processSuid(pCreateReq->ctb.suid, pRequest->pDb);
1349
      toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName);
5,618✔
1350
      code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta);
5,618✔
1351
      if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
5,618✔
1352
        code = TSDB_CODE_SUCCESS;
×
1353
        taosMemoryFreeClear(pTableMeta);
×
1354
        continue;
×
1355
      }
1356

1357
      if (code != TSDB_CODE_SUCCESS) {
5,618✔
1358
        goto end;
×
1359
      }
1360
      pCreateReq->ctb.suid = pTableMeta->uid;
5,618✔
1361

1362
      SArray* pTagVals = NULL;
5,618✔
1363
      code = tTagToValArray((STag*)pCreateReq->ctb.pTag, &pTagVals);
5,618✔
1364
      if (code != TSDB_CODE_SUCCESS) {
5,618✔
1365
        taosMemoryFreeClear(pTableMeta);
×
1366
        goto end;
×
1367
      }
1368

1369
      bool rebuildTag = false;
5,618✔
1370
      for (int32_t i = 0; i < taosArrayGetSize(pCreateReq->ctb.tagName); i++) {
17,390✔
1371
        char* tName = taosArrayGet(pCreateReq->ctb.tagName, i);
11,772✔
1372
        if (tName == NULL) {
11,772✔
1373
          continue;
×
1374
        }
1375
        for (int32_t j = pTableMeta->tableInfo.numOfColumns;
11,772✔
1376
             j < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; j++) {
46,251✔
1377
          SSchema* tag = &pTableMeta->schema[j];
34,479✔
1378
          if (strcmp(tag->name, tName) == 0 && tag->type != TSDB_DATA_TYPE_JSON) {
34,479✔
1379
            STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
11,086✔
1380
            if (pTagVal) {
11,086✔
1381
              if (pTagVal->cid != tag->colId) {
11,086✔
1382
                pTagVal->cid = tag->colId;
886✔
1383
                rebuildTag = true;
886✔
1384
              }
1385
            } else {
1386
              uError("create tb invalid data %s, size:%d index:%d cid:%d", pCreateReq->name,
×
1387
                     (int)taosArrayGetSize(pTagVals), i, tag->colId);
1388
            }
1389
          }
1390
        }
1391
      }
1392
      taosMemoryFreeClear(pTableMeta);
5,618✔
1393
      if (rebuildTag) {
5,618✔
1394
        STag* ppTag = NULL;
560✔
1395
        code = tTagNew(pTagVals, 1, false, &ppTag);
560✔
1396
        taosArrayDestroy(pTagVals);
560✔
1397
        pTagVals = NULL;
560✔
1398
        if (code != TSDB_CODE_SUCCESS) {
560✔
1399
          goto end;
×
1400
        }
1401
        if (NULL == taosArrayPush(pTagList, &ppTag)) {
560✔
1402
          tTagFree(ppTag);
×
1403
          goto end;
×
1404
        }
1405
        pCreateReq->ctb.pTag = (uint8_t*)ppTag;
560✔
1406
      }
1407
      taosArrayDestroy(pTagVals);
5,618✔
1408
    }
1409
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
12,934✔
1410

1411
    SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
6,467✔
1412
    if (pTableBatch == NULL) {
6,467✔
1413
      SVgroupCreateTableBatch tBatch = {0};
5,757✔
1414
      tBatch.info = pInfo;
5,757✔
1415
      tstrncpy(tBatch.dbName, pRequest->pDb, TSDB_DB_NAME_LEN);
5,757✔
1416

1417
      tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
5,757✔
1418
      RAW_NULL_CHECK(tBatch.req.pArray);
5,757✔
1419
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pCreateReq));
11,514✔
1420
      tBatch.req.source = TD_REQ_FROM_TAOX;
5,757✔
1421
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
5,757✔
1422
    } else {  // add to the correct vgroup
1423
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pCreateReq));
1,420✔
1424
    }
1425
  }
1426

1427
  if (taosHashGetSize(pVgroupHashmap) == 0) {
5,662✔
1428
    goto end;
×
1429
  }
1430
  SArray* pBufArray = NULL;
5,662✔
1431
  RAW_RETURN_CHECK(serializeVgroupsCreateTableBatch(pVgroupHashmap, &pBufArray));
5,662✔
1432
  pQuery = NULL;
5,662✔
1433
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
5,662✔
1434
  if (TSDB_CODE_SUCCESS != code) goto end;
5,662✔
1435
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
5,662✔
1436
  pQuery->msgType = TDMT_VND_CREATE_TABLE;
5,662✔
1437
  pQuery->stableQuery = false;
5,662✔
1438
  code = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT, &pQuery->pRoot);
5,662✔
1439
  if (TSDB_CODE_SUCCESS != code) goto end;
5,662✔
1440
  RAW_NULL_CHECK(pQuery->pRoot);
5,662✔
1441

1442
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
5,662✔
1443

1444
  launchQueryImpl(pRequest, pQuery, true, NULL);
5,662✔
1445
  if (pRequest->code == TSDB_CODE_SUCCESS) {
5,662✔
1446
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
5,662✔
1447
  }
1448

1449
  code = pRequest->code;
5,662✔
1450

1451
end:
5,662✔
1452
  uDebug(LOG_ID_TAG " create table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
5,662✔
1453
  tDeleteSVCreateTbBatchReq(&req);
5,662✔
1454

1455
  taosHashCleanup(pVgroupHashmap);
5,662✔
1456
  destroyRequest(pRequest);
5,662✔
1457
  tDecoderClear(&coder);
5,662✔
1458
  qDestroyQuery(pQuery);
5,662✔
1459
  taosArrayDestroyP(pTagList, NULL);
5,662✔
1460
  return code;
5,662✔
1461
}
1462

1463
typedef struct SVgroupDropTableBatch {
1464
  SVDropTbBatchReq req;
1465
  SVgroupInfo      info;
1466
  char             dbName[TSDB_DB_NAME_LEN];
1467
} SVgroupDropTableBatch;
1468

1469
static void destroyDropTbReqBatch(void* data) {
179✔
1470
  if (data == NULL) {
179✔
1471
    uError("invalid parameter in %s", __func__);
×
1472
    return;
×
1473
  }
1474
  SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
179✔
1475
  taosArrayDestroy(pTbBatch->req.pArray);
179✔
1476
}
1477

1478
static int32_t taosDropTable(TAOS* taos, void* meta, uint32_t metaLen) {
261✔
1479
  if (taos == NULL || meta == NULL) {
261✔
1480
    uError("invalid parameter in %s", __func__);
×
1481
    return TSDB_CODE_INVALID_PARA;
×
1482
  }
1483
  SVDropTbBatchReq req = {0};
261✔
1484
  SDecoder         coder = {0};
261✔
1485
  int32_t          code = TSDB_CODE_SUCCESS;
261✔
1486
  SRequestObj*     pRequest = NULL;
261✔
1487
  SQuery*          pQuery = NULL;
261✔
1488
  SHashObj*        pVgroupHashmap = NULL;
261✔
1489

1490
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
261✔
1491
  uDebug(LOG_ID_TAG " drop table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
261✔
1492

1493
  pRequest->syncQuery = true;
261✔
1494
  if (!pRequest->pDb) {
261✔
1495
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1496
    goto end;
×
1497
  }
1498
  // decode and process req
1499
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
261✔
1500
  uint32_t len = metaLen - sizeof(SMsgHead);
261✔
1501
  tDecoderInit(&coder, data, len);
261✔
1502
  if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
261✔
1503
    code = TSDB_CODE_INVALID_PARA;
×
1504
    goto end;
×
1505
  }
1506

1507
  STscObj* pTscObj = pRequest->pTscObj;
261✔
1508

1509
  SVDropTbReq* pDropReq = NULL;
261✔
1510
  SCatalog*    pCatalog = NULL;
261✔
1511
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
261✔
1512

1513
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
261✔
1514
  RAW_NULL_CHECK(pVgroupHashmap);
261✔
1515
  taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
261✔
1516

1517
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
261✔
1518
                           .requestId = pRequest->requestId,
261✔
1519
                           .requestObjRefId = pRequest->self,
261✔
1520
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
261✔
1521
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
261✔
1522
  RAW_NULL_CHECK(pRequest->tableList);
261✔
1523
  // loop to create table
1524
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
591✔
1525
    pDropReq = req.pReqs + iReq;
330✔
1526
    pDropReq->igNotExists = true;
330✔
1527
    //    pDropReq->suid = processSuid(pDropReq->suid, pRequest->pDb);
1528

1529
    SVgroupInfo pInfo = {0};
330✔
1530
    SName       pName = {0};
330✔
1531
    toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
330✔
1532
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
330✔
1533

1534
    STableMeta* pTableMeta = NULL;
330✔
1535
    code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
330✔
1536
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
330✔
1537
      code = TSDB_CODE_SUCCESS;
82✔
1538
      taosMemoryFreeClear(pTableMeta);
82✔
1539
      continue;
82✔
1540
    }
1541
    if (code != TSDB_CODE_SUCCESS) {
248✔
1542
      goto end;
×
1543
    }
1544
    tb_uid_t oldSuid = pDropReq->suid;
248✔
1545
    pDropReq->suid = pTableMeta->suid;
248✔
1546
    taosMemoryFreeClear(pTableMeta);
248✔
1547
    uDebug(LOG_ID_TAG " drop table name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, pDropReq->name, oldSuid,
248✔
1548
           pDropReq->suid);
1549

1550
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
496✔
1551
    SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
248✔
1552
    if (pTableBatch == NULL) {
248✔
1553
      SVgroupDropTableBatch tBatch = {0};
179✔
1554
      tBatch.info = pInfo;
179✔
1555
      tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
179✔
1556
      RAW_NULL_CHECK(tBatch.req.pArray);
179✔
1557
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pDropReq));
358✔
1558
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
179✔
1559
    } else {  // add to the correct vgroup
1560
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pDropReq));
138✔
1561
    }
1562
  }
1563

1564
  if (taosHashGetSize(pVgroupHashmap) == 0) {
261✔
1565
    goto end;
82✔
1566
  }
1567
  SArray* pBufArray = NULL;
179✔
1568
  RAW_RETURN_CHECK(serializeVgroupsDropTableBatch(pVgroupHashmap, &pBufArray));
179✔
1569
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
179✔
1570
  if (TSDB_CODE_SUCCESS != code) goto end;
179✔
1571
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
179✔
1572
  pQuery->msgType = TDMT_VND_DROP_TABLE;
179✔
1573
  pQuery->stableQuery = false;
179✔
1574
  pQuery->pRoot = NULL;
179✔
1575
  code = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT, &pQuery->pRoot);
179✔
1576
  if (TSDB_CODE_SUCCESS != code) goto end;
179✔
1577
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
179✔
1578

1579
  launchQueryImpl(pRequest, pQuery, true, NULL);
179✔
1580
  if (pRequest->code == TSDB_CODE_SUCCESS) {
179✔
1581
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
179✔
1582
  }
1583
  code = pRequest->code;
179✔
1584

1585
end:
261✔
1586
  uDebug(LOG_ID_TAG " drop table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
261✔
1587
  taosHashCleanup(pVgroupHashmap);
261✔
1588
  destroyRequest(pRequest);
261✔
1589
  tDecoderClear(&coder);
261✔
1590
  qDestroyQuery(pQuery);
261✔
1591
  return code;
261✔
1592
}
1593

1594
static int32_t taosDeleteData(TAOS* taos, void* meta, uint32_t metaLen) {
162✔
1595
  if (taos == NULL || meta == NULL) {
162✔
1596
    uError("invalid parameter in %s", __func__);
×
1597
    return TSDB_CODE_INVALID_PARA;
×
1598
  }
1599
  SDeleteRes req = {0};
162✔
1600
  SDecoder   coder = {0};
162✔
1601
  char       sql[256] = {0};
162✔
1602
  int32_t    code = TSDB_CODE_SUCCESS;
162✔
1603

1604
  uDebug("connId:0x%" PRIx64 " delete data, meta:%p, len:%d", *(int64_t*)taos, meta, metaLen);
162✔
1605

1606
  // decode and process req
1607
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
162✔
1608
  uint32_t len = metaLen - sizeof(SMsgHead);
162✔
1609
  tDecoderInit(&coder, data, len);
162✔
1610
  if (tDecodeDeleteRes(&coder, &req) < 0) {
162✔
1611
    code = TSDB_CODE_INVALID_PARA;
×
1612
    goto end;
×
1613
  }
1614

1615
  (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
162✔
1616
                 req.tsColName, req.skey, req.tsColName, req.ekey);
1617

1618
  TAOS_RES* res = taosQueryImpl(taos, sql, false, TD_REQ_FROM_TAOX);
162✔
1619
  RAW_NULL_CHECK(res);
162✔
1620
  SRequestObj* pRequest = (SRequestObj*)res;
162✔
1621
  code = pRequest->code;
162✔
1622
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_PAR_GET_META_ERROR) {
162✔
1623
    code = TSDB_CODE_SUCCESS;
41✔
1624
  }
1625
  taos_free_result(res);
162✔
1626

1627
end:
162✔
1628
  uDebug("connId:0x%" PRIx64 " delete data sql:%s, code:%s", *(int64_t*)taos, sql, tstrerror(code));
162✔
1629
  tDecoderClear(&coder);
162✔
1630
  return code;
162✔
1631
}
1632

1633
static int32_t taosAlterTable(TAOS* taos, void* meta, uint32_t metaLen) {
1,386✔
1634
  if (taos == NULL || meta == NULL) {
1,386✔
1635
    uError("invalid parameter in %s", __func__);
×
1636
    return TSDB_CODE_INVALID_PARA;
×
1637
  }
1638
  SVAlterTbReq   req = {0};
1,386✔
1639
  SDecoder       dcoder = {0};
1,386✔
1640
  int32_t        code = TSDB_CODE_SUCCESS;
1,386✔
1641
  SRequestObj*   pRequest = NULL;
1,386✔
1642
  SQuery*        pQuery = NULL;
1,386✔
1643
  SArray*        pArray = NULL;
1,386✔
1644
  SVgDataBlocks* pVgData = NULL;
1,386✔
1645

1646
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
1,386✔
1647
  uDebug(LOG_ID_TAG " alter table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
1,386✔
1648
  pRequest->syncQuery = true;
1,386✔
1649
  if (!pRequest->pDb) {
1,386✔
1650
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1651
    goto end;
×
1652
  }
1653
  // decode and process req
1654
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
1,386✔
1655
  uint32_t len = metaLen - sizeof(SMsgHead);
1,386✔
1656
  tDecoderInit(&dcoder, data, len);
1,386✔
1657
  if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) {
1,386✔
1658
    code = TSDB_CODE_INVALID_PARA;
×
1659
    goto end;
×
1660
  }
1661

1662
  // do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
1663
  if (req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS) {
1,386✔
1664
    goto end;
231✔
1665
  }
1666

1667
  STscObj*  pTscObj = pRequest->pTscObj;
1,155✔
1668
  SCatalog* pCatalog = NULL;
1,155✔
1669
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
1,155✔
1670
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
1,155✔
1671
                           .requestId = pRequest->requestId,
1,155✔
1672
                           .requestObjRefId = pRequest->self,
1,155✔
1673
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
1,155✔
1674

1675
  SVgroupInfo pInfo = {0};
1,155✔
1676
  SName       pName = {0};
1,155✔
1677
  toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
1,155✔
1678
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
1,155✔
1679
  pArray = taosArrayInit(1, sizeof(void*));
1,155✔
1680
  RAW_NULL_CHECK(pArray);
1,155✔
1681

1682
  pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
1,155✔
1683
  RAW_NULL_CHECK(pVgData);
1,155✔
1684
  pVgData->vg = pInfo;
1,155✔
1685

1686
  int tlen = 0;
1,155✔
1687
  req.source = TD_REQ_FROM_TAOX;
1,155✔
1688
  tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code);
1,155✔
1689
  if (code != 0) {
1,155✔
1690
    code = terrno;
×
1691
    goto end;
×
1692
  }
1693
  tlen += sizeof(SMsgHead);
1,155✔
1694
  void* pMsg = taosMemoryMalloc(tlen);
1,155✔
1695
  RAW_NULL_CHECK(pMsg);
1,155✔
1696
  ((SMsgHead*)pMsg)->vgId = htonl(pInfo.vgId);
1,155✔
1697
  ((SMsgHead*)pMsg)->contLen = htonl(tlen);
1,155✔
1698
  void*    pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
1,155✔
1699
  SEncoder coder = {0};
1,155✔
1700
  tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
1,155✔
1701
  code = tEncodeSVAlterTbReq(&coder, &req);
1,155✔
1702
  if (code != 0) {
1,155✔
1703
    tEncoderClear(&coder);
×
1704
    code = terrno;
×
1705
    goto end;
×
1706
  }
1707
  tEncoderClear(&coder);
1,155✔
1708

1709
  pVgData->pData = pMsg;
1,155✔
1710
  pVgData->size = tlen;
1,155✔
1711

1712
  pVgData->numOfTables = 1;
1,155✔
1713
  RAW_NULL_CHECK(taosArrayPush(pArray, &pVgData));
1,155✔
1714

1715
  pQuery = NULL;
1,155✔
1716
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
1,155✔
1717
  if (NULL == pQuery) goto end;
1,155✔
1718
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
1,155✔
1719
  pQuery->msgType = TDMT_VND_ALTER_TABLE;
1,155✔
1720
  pQuery->stableQuery = false;
1,155✔
1721
  pQuery->pRoot = NULL;
1,155✔
1722
  code = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT, &pQuery->pRoot);
1,155✔
1723
  if (TSDB_CODE_SUCCESS != code) goto end;
1,155✔
1724
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pArray));
1,155✔
1725

1726
  launchQueryImpl(pRequest, pQuery, true, NULL);
1,155✔
1727

1728
  pVgData = NULL;
1,155✔
1729
  pArray = NULL;
1,155✔
1730
  code = pRequest->code;
1,155✔
1731
  if (code == TSDB_CODE_TDB_TABLE_NOT_EXIST) {
1,155✔
1732
    code = TSDB_CODE_SUCCESS;
41✔
1733
  }
1734

1735
  if (pRequest->code == TSDB_CODE_SUCCESS) {
1,155✔
1736
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
1,114✔
1737
    if (pRes->res != NULL) {
1,114✔
1738
      code = handleAlterTbExecRes(pRes->res, pCatalog);
924✔
1739
    }
1740
  }
1741
end:
1,386✔
1742
  uDebug(LOG_ID_TAG " alter table return, meta:%p, len:%d, msg:%s", LOG_ID_VALUE, meta, metaLen, tstrerror(code));
1,386✔
1743
  taosArrayDestroy(pArray);
1,386✔
1744
  if (pVgData) taosMemoryFreeClear(pVgData->pData);
1,386✔
1745
  taosMemoryFreeClear(pVgData);
1,386✔
1746
  destroyRequest(pRequest);
1,386✔
1747
  tDecoderClear(&dcoder);
1,386✔
1748
  qDestroyQuery(pQuery);
1,386✔
1749
  taosArrayDestroy(req.pMultiTag);
1,386✔
1750
  return code;
1,386✔
1751
}
1752

1753
int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD* fields,
46✔
1754
                                     int numFields) {
1755
  return taos_write_raw_block_with_fields_with_reqid(taos, rows, pData, tbname, fields, numFields, 0);
46✔
1756
}
1757

1758
int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname,
46✔
1759
                                                TAOS_FIELD* fields, int numFields, int64_t reqid) {
1760
  if (taos == NULL || pData == NULL || tbname == NULL) {
46✔
UNCOV
1761
    uError("invalid parameter in %s", __func__);
×
1762
    return TSDB_CODE_INVALID_PARA;
×
1763
  }
1764
  int32_t     code = TSDB_CODE_SUCCESS;
46✔
1765
  STableMeta* pTableMeta = NULL;
46✔
1766
  SQuery*     pQuery = NULL;
46✔
1767
  SHashObj*   pVgHash = NULL;
46✔
1768

1769
  SRequestObj* pRequest = NULL;
46✔
1770
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
46✔
1771

1772
  uDebug(LOG_ID_TAG " write raw block with field, rows:%d, pData:%p, tbname:%s, fields:%p, numFields:%d", LOG_ID_VALUE,
46✔
1773
         rows, pData, tbname, fields, numFields);
1774

1775
  pRequest->syncQuery = true;
46✔
1776
  if (!pRequest->pDb) {
46✔
UNCOV
1777
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1778
    goto end;
×
1779
  }
1780

1781
  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
46✔
1782
  tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
46✔
1783
  tstrncpy(pName.tname, tbname, sizeof(pName.tname));
46✔
1784

1785
  struct SCatalog* pCatalog = NULL;
46✔
1786
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
46✔
1787

1788
  SRequestConnInfo conn = {0};
46✔
1789
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
46✔
1790
  conn.requestId = pRequest->requestId;
46✔
1791
  conn.requestObjRefId = pRequest->self;
46✔
1792
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
46✔
1793

1794
  SVgroupInfo vgData = {0};
46✔
1795
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData));
46✔
1796
  RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
46✔
1797
  RAW_RETURN_CHECK(smlInitHandle(&pQuery));
46✔
1798
  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
46✔
1799
  RAW_NULL_CHECK(pVgHash);
46✔
1800
  RAW_RETURN_CHECK(
46✔
1801
      taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
1802
  RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0, false));
46✔
1803
  RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
46✔
1804

1805
  launchQueryImpl(pRequest, pQuery, true, NULL);
46✔
1806
  code = pRequest->code;
46✔
1807

1808
end:
46✔
1809
  uDebug(LOG_ID_TAG " write raw block with field return, msg:%s", LOG_ID_VALUE, tstrerror(code));
46✔
1810
  taosMemoryFreeClear(pTableMeta);
46✔
1811
  qDestroyQuery(pQuery);
46✔
1812
  destroyRequest(pRequest);
46✔
1813
  taosHashCleanup(pVgHash);
46✔
1814
  return code;
46✔
1815
}
1816

1817
int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) {
322✔
1818
  return taos_write_raw_block_with_reqid(taos, rows, pData, tbname, 0);
322✔
1819
}
1820

1821
int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname, int64_t reqid) {
322✔
1822
  if (taos == NULL || pData == NULL || tbname == NULL) {
322✔
UNCOV
1823
    return TSDB_CODE_INVALID_PARA;
×
1824
  }
1825
  int32_t     code = TSDB_CODE_SUCCESS;
322✔
1826
  STableMeta* pTableMeta = NULL;
322✔
1827
  SQuery*     pQuery = NULL;
322✔
1828
  SHashObj*   pVgHash = NULL;
322✔
1829

1830
  SRequestObj* pRequest = NULL;
322✔
1831
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
322✔
1832

1833
  uDebug(LOG_ID_TAG " write raw block, rows:%d, pData:%p, tbname:%s", LOG_ID_VALUE, rows, pData, tbname);
322✔
1834

1835
  pRequest->syncQuery = true;
322✔
1836
  if (!pRequest->pDb) {
322✔
UNCOV
1837
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1838
    goto end;
×
1839
  }
1840

1841
  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
322✔
1842
  tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
322✔
1843
  tstrncpy(pName.tname, tbname, sizeof(pName.tname));
322✔
1844

1845
  struct SCatalog* pCatalog = NULL;
322✔
1846
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
322✔
1847

1848
  SRequestConnInfo conn = {0};
322✔
1849
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
322✔
1850
  conn.requestId = pRequest->requestId;
322✔
1851
  conn.requestObjRefId = pRequest->self;
322✔
1852
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
322✔
1853

1854
  SVgroupInfo vgData = {0};
322✔
1855
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData));
322✔
1856
  RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
322✔
1857
  RAW_RETURN_CHECK(smlInitHandle(&pQuery));
276✔
1858
  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
276✔
1859
  RAW_NULL_CHECK(pVgHash);
276✔
1860
  RAW_RETURN_CHECK(
276✔
1861
      taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
1862
  RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false, NULL, 0, false));
276✔
1863
  RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
184✔
1864

1865
  launchQueryImpl(pRequest, pQuery, true, NULL);
184✔
1866
  code = pRequest->code;
184✔
1867

1868
end:
322✔
1869
  uDebug(LOG_ID_TAG " write raw block return, msg:%s", LOG_ID_VALUE, tstrerror(code));
322✔
1870
  taosMemoryFreeClear(pTableMeta);
322✔
1871
  qDestroyQuery(pQuery);
322✔
1872
  destroyRequest(pRequest);
322✔
1873
  taosHashCleanup(pVgHash);
322✔
1874
  return code;
322✔
1875
}
1876

1877
static void* getRawDataFromRes(void* pRetrieve) {
5,697✔
1878
  if (pRetrieve == NULL) {
5,697✔
UNCOV
1879
    uError("invalid parameter in %s", __func__);
×
1880
    return NULL;
×
1881
  }
1882
  void* rawData = NULL;
5,697✔
1883
  // deal with compatibility
1884
  if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_VERSION) {
5,697✔
UNCOV
1885
    rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
×
1886
  } else if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_VERSION ||
5,697✔
UNCOV
1887
             *(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION) {
×
1888
    rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
5,697✔
1889
  }
1890
  return rawData;
5,697✔
1891
}
1892

1893
static int32_t buildCreateTbMap(SMqDataRsp* rsp, SHashObj* pHashObj) {
257✔
1894
  if (rsp == NULL || pHashObj == NULL) {
257✔
UNCOV
1895
    uError("invalid parameter in %s", __func__);
×
1896
    return TSDB_CODE_INVALID_PARA;
×
1897
  }
1898
  // find schema data info
1899
  int32_t       code = 0;
257✔
1900
  SVCreateTbReq pCreateReq = {0};
257✔
1901
  SDecoder      decoderTmp = {0};
257✔
1902

1903
  for (int j = 0; j < rsp->createTableNum; j++) {
905✔
1904
    void** dataTmp = taosArrayGet(rsp->createTableReq, j);
648✔
1905
    RAW_NULL_CHECK(dataTmp);
648✔
1906
    int32_t* lenTmp = taosArrayGet(rsp->createTableLen, j);
648✔
1907
    RAW_NULL_CHECK(lenTmp);
648✔
1908

1909
    tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
648✔
1910
    RAW_RETURN_CHECK(tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq));
648✔
1911

1912
    if (pCreateReq.type != TSDB_CHILD_TABLE) {
648✔
UNCOV
1913
      code = TSDB_CODE_INVALID_MSG;
×
1914
      goto end;
×
1915
    }
1916
    if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL) {
648✔
1917
      RAW_RETURN_CHECK(
648✔
1918
          taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq)));
1919
    } else {
UNCOV
1920
      tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
×
1921
      pCreateReq = (SVCreateTbReq){0};
×
1922
    }
1923

1924
    tDecoderClear(&decoderTmp);
648✔
1925
  }
1926
  return 0;
257✔
1927

UNCOV
1928
end:
×
1929
  tDecoderClear(&decoderTmp);
×
1930
  tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
×
1931
  return code;
×
1932
}
1933

1934
typedef enum {
1935
  WRITE_RAW_INIT_START = 0,
1936
  WRITE_RAW_INIT_OK,
1937
  WRITE_RAW_INIT_FAIL,
1938
} WRITE_RAW_INIT_STATUS;
1939

1940
static SHashObj* writeRawCache = NULL;
1941
static int8_t    initFlag = 0;
1942
static int8_t    initedFlag = WRITE_RAW_INIT_START;
1943

1944
typedef struct {
1945
  SHashObj* pVgHash;
1946
  SHashObj* pNameHash;
1947
  SHashObj* pMetaHash;
1948
} rawCacheInfo;
1949

1950
typedef struct {
1951
  SVgroupInfo vgInfo;
1952
  int64_t     uid;
1953
  int64_t     suid;
1954
} tbInfo;
1955

1956
static void tmqFreeMeta(void* data) {
1,638✔
1957
  if (data == NULL) {
1,638✔
UNCOV
1958
    uError("invalid parameter in %s", __func__);
×
1959
    return;
×
1960
  }
1961
  STableMeta* pTableMeta = *(STableMeta**)data;
1,638✔
1962
  taosMemoryFree(pTableMeta);
1,638✔
1963
}
1964

UNCOV
1965
static void freeRawCache(void* data) {
×
1966
  if (data == NULL) {
×
1967
    uError("invalid parameter in %s", __func__);
×
1968
    return;
×
1969
  }
UNCOV
1970
  rawCacheInfo* pRawCache = (rawCacheInfo*)data;
×
1971
  taosHashCleanup(pRawCache->pMetaHash);
×
1972
  taosHashCleanup(pRawCache->pNameHash);
×
1973
  taosHashCleanup(pRawCache->pVgHash);
×
1974
}
1975

1976
static int32_t initRawCacheHash() {
595✔
1977
  if (writeRawCache == NULL) {
595✔
1978
    writeRawCache = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
595✔
1979
    if (writeRawCache == NULL) {
595✔
UNCOV
1980
      return terrno;
×
1981
    }
1982
    taosHashSetFreeFp(writeRawCache, freeRawCache);
595✔
1983
  }
1984
  return 0;
595✔
1985
}
1986

1987
static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW) {
715✔
1988
  if (rawData == NULL || pSW == NULL) {
715✔
UNCOV
1989
    return false;
×
1990
  }
1991
  if (pTableMeta == NULL) {
715✔
UNCOV
1992
    uError("invalid parameter in %s", __func__);
×
1993
    return false;
×
1994
  }
1995
  char* p = (char*)rawData;
715✔
1996
  // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
1997
  // column length |
1998
  p += sizeof(int32_t);
715✔
1999
  p += sizeof(int32_t);
715✔
2000
  p += sizeof(int32_t);
715✔
2001
  p += sizeof(int32_t);
715✔
2002
  p += sizeof(int32_t);
715✔
2003
  p += sizeof(uint64_t);
715✔
2004
  int8_t* fields = p;
715✔
2005

2006
  if (pSW->nCols != pTableMeta->tableInfo.numOfColumns) {
715✔
2007
    return true;
162✔
2008
  }
2009

2010
  for (int i = 0; i < pSW->nCols; i++) {
2,886✔
2011
    int j = 0;
2,333✔
2012
    for (; j < pTableMeta->tableInfo.numOfColumns; j++) {
6,135✔
2013
      SSchema*    pColSchema = &pTableMeta->schema[j];
6,135✔
2014
      SSchemaExt* pColExtSchema = &pTableMeta->schemaExt[j];
6,135✔
2015
      char*       fieldName = pSW->pSchema[i].name;
6,135✔
2016

2017
      if (strcmp(pColSchema->name, fieldName) == 0) {
6,135✔
2018
        if (checkSchema(pColSchema, pColExtSchema, fields, NULL, 0) != 0) {
2,333✔
UNCOV
2019
          return true;
×
2020
        }
2021
        break;
2,333✔
2022
      }
2023
    }
2024
    fields += sizeof(int8_t) + sizeof(int32_t);
2,333✔
2025

2026
    if (j == pTableMeta->tableInfo.numOfColumns) return true;
2,333✔
2027
  }
2028
  return false;
553✔
2029
}
2030

2031
static int32_t getRawCache(SHashObj** pVgHash, SHashObj** pNameHash, SHashObj** pMetaHash, void* key) {
2,141✔
2032
  if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || key == NULL) {
2,141✔
UNCOV
2033
    uError("invalid parameter in %s", __func__);
×
2034
    return TSDB_CODE_INVALID_PARA;
×
2035
  }
2036
  int32_t code = 0;
2,141✔
2037
  void*   cacheInfo = taosHashGet(writeRawCache, &key, POINTER_BYTES);
2,141✔
2038
  if (cacheInfo == NULL) {
2,141✔
2039
    *pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
2,141✔
2040
    RAW_NULL_CHECK(*pVgHash);
2,141✔
2041
    *pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
2,141✔
2042
    RAW_NULL_CHECK(*pNameHash);
2,141✔
2043
    *pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
2,141✔
2044
    RAW_NULL_CHECK(*pMetaHash);
2,141✔
2045
    taosHashSetFreeFp(*pMetaHash, tmqFreeMeta);
2,141✔
2046
    rawCacheInfo info = {*pVgHash, *pNameHash, *pMetaHash};
2,141✔
2047
    RAW_RETURN_CHECK(taosHashPut(writeRawCache, &key, POINTER_BYTES, &info, sizeof(rawCacheInfo)));
2,141✔
2048
  } else {
UNCOV
2049
    rawCacheInfo* info = (rawCacheInfo*)cacheInfo;
×
2050
    *pVgHash = info->pVgHash;
×
2051
    *pNameHash = info->pNameHash;
×
2052
    *pMetaHash = info->pMetaHash;
×
2053
  }
2054

2055
  return 0;
2,141✔
UNCOV
2056
end:
×
2057
  taosHashCleanup(*pMetaHash);
×
2058
  taosHashCleanup(*pNameHash);
×
2059
  taosHashCleanup(*pVgHash);
×
2060
  return code;
×
2061
}
2062

2063
static int32_t buildRawRequest(TAOS* taos, SRequestObj** pRequest, SCatalog** pCatalog, SRequestConnInfo* conn) {
2,141✔
2064
  if (taos == NULL || pRequest == NULL || pCatalog == NULL || conn == NULL) {
2,141✔
UNCOV
2065
    uError("invalid parameter in %s", __func__);
×
2066
    return TSDB_CODE_INVALID_PARA;
×
2067
  }
2068
  int32_t code = 0;
2,141✔
2069
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, pRequest, 0));
2,141✔
2070
  (*pRequest)->syncQuery = true;
2,141✔
2071
  if (!(*pRequest)->pDb) {
2,141✔
UNCOV
2072
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
2073
    goto end;
×
2074
  }
2075

2076
  RAW_RETURN_CHECK(catalogGetHandle((*pRequest)->pTscObj->pAppInfo->clusterId, pCatalog));
2,141✔
2077
  conn->pTrans = (*pRequest)->pTscObj->pAppInfo->pTransporter;
2,141✔
2078
  conn->requestId = (*pRequest)->requestId;
2,141✔
2079
  conn->requestObjRefId = (*pRequest)->self;
2,141✔
2080
  conn->mgmtEps = getEpSet_s(&(*pRequest)->pTscObj->pAppInfo->mgmtEp);
2,141✔
2081

2082
end:
2,141✔
2083
  return code;
2,141✔
2084
}
2085

2086
typedef int32_t _raw_decode_func_(SDecoder* pDecoder, SMqDataRsp* pRsp);
2087
static int32_t  decodeRawData(SDecoder* decoder, void* data, uint32_t dataLen, _raw_decode_func_ func,
2,141✔
2088
                              SMqRspObj* rspObj) {
2089
  if (decoder == NULL || data == NULL || func == NULL || rspObj == NULL) {
2,141✔
UNCOV
2090
    uError("invalid parameter in %s", __func__);
×
2091
    return TSDB_CODE_INVALID_PARA;
×
2092
  }
2093
  int8_t dataVersion = *(int8_t*)data;
2,141✔
2094
  if (dataVersion >= MQ_DATA_RSP_VERSION) {
2,141✔
2095
    data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
2,141✔
2096
    if (dataLen < sizeof(int8_t) + sizeof(int32_t)) {
2,141✔
UNCOV
2097
      return TSDB_CODE_INVALID_PARA;
×
2098
    }
2099
    dataLen -= sizeof(int8_t) + sizeof(int32_t);
2,141✔
2100
  }
2101

2102
  rspObj->resIter = -1;
2,141✔
2103
  tDecoderInit(decoder, data, dataLen);
2,141✔
2104
  int32_t code = func(decoder, &rspObj->dataRsp);
2,141✔
2105
  if (code != 0) {
2,141✔
UNCOV
2106
    SET_ERROR_MSG("decode mq taosx data rsp failed");
×
2107
  }
2108
  return code;
2,141✔
2109
}
2110

2111
static int32_t processCacheMeta(SHashObj* pVgHash, SHashObj* pNameHash, SHashObj* pMetaHash,
5,697✔
2112
                                SVCreateTbReq* pCreateReqDst, SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName,
2113
                                STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry) {
2114
  if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || pCatalog == NULL || conn == NULL || pName == NULL ||
5,697✔
2115
      pMeta == NULL) {
UNCOV
2116
    uError("invalid parameter in %s", __func__);
×
2117
    return TSDB_CODE_INVALID_PARA;
×
2118
  }
2119
  int32_t     code = 0;
5,697✔
2120
  STableMeta* pTableMeta = NULL;
5,697✔
2121
  tbInfo*     tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
5,697✔
2122
  if (tmpInfo == NULL || retry > 0) {
5,697✔
2123
    tbInfo info = {0};
4,982✔
2124

2125
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, conn, pName, &info.vgInfo));
4,982✔
2126
    if (pCreateReqDst && tmpInfo == NULL) {  // change stable name to get meta
4,982✔
2127
      tstrncpy(pName->tname, pCreateReqDst->ctb.stbName, TSDB_TABLE_NAME_LEN);
648✔
2128
    }
2129
    RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
4,982✔
2130
    info.uid = pTableMeta->uid;
4,982✔
2131
    if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
4,982✔
2132
      info.suid = pTableMeta->suid;
3,664✔
2133
    } else {
2134
      info.suid = pTableMeta->uid;
1,318✔
2135
    }
2136
    code = taosHashPut(pMetaHash, &info.suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
4,982✔
2137
    if (code != 0) {
4,982✔
UNCOV
2138
      taosMemoryFree(pTableMeta);
×
2139
      goto end;
×
2140
    }
2141
    uDebug("put table meta to hash1, suid:%" PRId64 ", metaHashSIze:%d, nameHashSize:%d, vgHashSize:%d", info.suid, taosHashGetSize(pMetaHash),
4,982✔
2142
           taosHashGetSize(pNameHash), taosHashGetSize(pVgHash));
2143
    if (pCreateReqDst) {
4,982✔
2144
      pTableMeta->vgId = info.vgInfo.vgId;
648✔
2145
      pTableMeta->uid = pCreateReqDst->uid;
648✔
2146
      pCreateReqDst->ctb.suid = pTableMeta->suid;
648✔
2147
    }
2148

2149
    RAW_RETURN_CHECK(taosHashPut(pNameHash, pName->tname, strlen(pName->tname), &info, sizeof(tbInfo)));
4,982✔
2150
    tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
4,982✔
2151
    RAW_RETURN_CHECK(
4,982✔
2152
        taosHashPut(pVgHash, &info.vgInfo.vgId, sizeof(info.vgInfo.vgId), &info.vgInfo, sizeof(SVgroupInfo)));
2153
  }
2154

2155
  if (pTableMeta == NULL || retry > 0) {
5,697✔
2156
    STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, &tmpInfo->suid, LONG_BYTES);
715✔
2157
    if (pTableMetaTmp == NULL || retry > 0 || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) {
715✔
2158
      RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
162✔
2159
      code = taosHashPut(pMetaHash, &tmpInfo->suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
162✔
2160
      if (code != 0) {
162✔
UNCOV
2161
        taosMemoryFree(pTableMeta);
×
2162
        goto end;
×
2163
      }
2164
      uDebug("put table meta to hash2, suid:%" PRId64 ", metaHashSIze:%d, nameHashSize:%d, vgHashSize:%d", tmpInfo->suid, taosHashGetSize(pMetaHash),
162✔
2165
      taosHashGetSize(pNameHash), taosHashGetSize(pVgHash));
2166
    } else {
2167
      pTableMeta = *pTableMetaTmp;
553✔
2168
      pTableMeta->uid = tmpInfo->uid;
553✔
2169
      pTableMeta->vgId = tmpInfo->vgInfo.vgId;
553✔
2170
    }
2171
  }
2172
  *pMeta = pTableMeta;
5,697✔
2173

2174
end:
5,697✔
2175
  return code;
5,697✔
2176
}
2177

2178
static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
1,884✔
2179
  if (taos == NULL || data == NULL) {
1,884✔
UNCOV
2180
    uError("invalid parameter in %s", __func__);
×
2181
    return TSDB_CODE_INVALID_PARA;
×
2182
  }
2183
  int32_t   code = TSDB_CODE_SUCCESS;
1,884✔
2184
  SQuery*   pQuery = NULL;
1,884✔
2185
  SMqRspObj rspObj = {0};
1,884✔
2186
  SDecoder  decoder = {0};
1,884✔
2187

2188
  SRequestObj*     pRequest = NULL;
1,884✔
2189
  SCatalog*        pCatalog = NULL;
1,884✔
2190
  SRequestConnInfo conn = {0};
1,884✔
2191
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
1,884✔
2192
  uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
1,884✔
2193
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
1,884✔
2194

2195
  SHashObj* pVgHash = NULL;
1,884✔
2196
  SHashObj* pNameHash = NULL;
1,884✔
2197
  SHashObj* pMetaHash = NULL;
1,884✔
2198
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
1,884✔
2199
  int retry = 0;
1,884✔
2200
  while (1) {
2201
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
1,884✔
2202
    uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
1,884✔
2203
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
6,447✔
2204
      if (!rspObj.dataRsp.withSchema) {
4,563✔
UNCOV
2205
        goto end;
×
2206
      }
2207

2208
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
4,563✔
2209
      RAW_NULL_CHECK(tbName);
4,563✔
2210
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
4,563✔
2211
      RAW_NULL_CHECK(pSW);
4,563✔
2212
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
4,563✔
2213
      RAW_NULL_CHECK(pRetrieve);
4,563✔
2214
      void* rawData = getRawDataFromRes(pRetrieve);
4,563✔
2215
      RAW_NULL_CHECK(rawData);
4,563✔
2216

2217
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
4,563✔
2218
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
4,563✔
2219
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
4,563✔
2220
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
4,563✔
2221

2222
      STableMeta* pTableMeta = NULL;
4,563✔
2223
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, &pTableMeta, pSW,
4,563✔
2224
                                        rawData, retry));
2225
      char err[ERR_MSG_LEN] = {0};
4,563✔
2226
      code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
4,563✔
2227
      if (code != TSDB_CODE_SUCCESS) {
4,563✔
UNCOV
2228
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
2229
        goto end;
×
2230
      }
2231
    }
2232
    RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
1,884✔
2233
    launchQueryImpl(pRequest, pQuery, true, NULL);
1,884✔
2234
    code = pRequest->code;
1,884✔
2235

2236
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
1,884✔
UNCOV
2237
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
2238
      qDestroyQuery(pQuery);
×
2239
      pQuery = NULL;
×
2240
      rspObj.resIter = -1;
×
2241
      continue;
×
2242
    }
2243
    break;
1,884✔
2244
  }
2245

2246
end:
1,884✔
2247
  uDebug(LOG_ID_TAG " write raw data return, msg:%s", LOG_ID_VALUE, tstrerror(code));
1,884✔
2248
  tDeleteMqDataRsp(&rspObj.dataRsp);
1,884✔
2249
  tDecoderClear(&decoder);
1,884✔
2250
  qDestroyQuery(pQuery);
1,884✔
2251
  destroyRequest(pRequest);
1,884✔
2252
  return code;
1,884✔
2253
}
2254

2255
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
257✔
2256
  if (taos == NULL || data == NULL) {
257✔
UNCOV
2257
    uError("invalid parameter in %s", __func__);
×
2258
    return TSDB_CODE_INVALID_PARA;
×
2259
  }
2260
  int32_t   code = TSDB_CODE_SUCCESS;
257✔
2261
  SQuery*   pQuery = NULL;
257✔
2262
  SMqRspObj rspObj = {0};
257✔
2263
  SDecoder  decoder = {0};
257✔
2264
  SHashObj* pCreateTbHash = NULL;
257✔
2265

2266
  SRequestObj*     pRequest = NULL;
257✔
2267
  SCatalog*        pCatalog = NULL;
257✔
2268
  SRequestConnInfo conn = {0};
257✔
2269

2270
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
257✔
2271
  uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
257✔
2272
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeSTaosxRsp, &rspObj));
257✔
2273

2274
  pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
257✔
2275
  RAW_NULL_CHECK(pCreateTbHash);
257✔
2276
  RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash));
257✔
2277

2278
  SHashObj* pVgHash = NULL;
257✔
2279
  SHashObj* pNameHash = NULL;
257✔
2280
  SHashObj* pMetaHash = NULL;
257✔
2281
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
257✔
2282
  int retry = 0;
257✔
2283
  while (1) {
2284
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
257✔
2285
    uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
257✔
2286
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
1,391✔
2287
      if (!rspObj.dataRsp.withSchema) {
1,134✔
UNCOV
2288
        goto end;
×
2289
      }
2290

2291
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
1,134✔
2292
      RAW_NULL_CHECK(tbName);
1,134✔
2293
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
1,134✔
2294
      RAW_NULL_CHECK(pSW);
1,134✔
2295
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
1,134✔
2296
      RAW_NULL_CHECK(pRetrieve);
1,134✔
2297
      void* rawData = getRawDataFromRes(pRetrieve);
1,134✔
2298
      RAW_NULL_CHECK(rawData);
1,134✔
2299

2300
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
1,134✔
2301
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
1,134✔
2302
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
1,134✔
2303
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
1,134✔
2304

2305
      // find schema data info
2306
      SVCreateTbReq* pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, pName.tname, strlen(pName.tname));
1,134✔
2307
      STableMeta*    pTableMeta = NULL;
1,134✔
2308
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, pCreateReqDst, pCatalog, &conn, &pName,
1,134✔
2309
                                        &pTableMeta, pSW, rawData, retry));
2310
      char err[ERR_MSG_LEN] = {0};
1,134✔
2311
      code =
2312
          rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
1,134✔
2313
      if (code != TSDB_CODE_SUCCESS) {
1,134✔
UNCOV
2314
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
2315
        goto end;
×
2316
      }
2317
    }
2318
    RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
257✔
2319
    launchQueryImpl(pRequest, pQuery, true, NULL);
257✔
2320
    code = pRequest->code;
257✔
2321

2322
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
257✔
UNCOV
2323
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
2324
      qDestroyQuery(pQuery);
×
2325
      pQuery = NULL;
×
2326
      rspObj.resIter = -1;
×
2327
      continue;
×
2328
    }
2329
    break;
257✔
2330
  }
2331

2332
end:
257✔
2333
  uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
257✔
2334
  tDeleteSTaosxRsp(&rspObj.dataRsp);
257✔
2335
  void* pIter = taosHashIterate(pCreateTbHash, NULL);
257✔
2336
  while (pIter) {
905✔
2337
    tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE);
648✔
2338
    pIter = taosHashIterate(pCreateTbHash, pIter);
648✔
2339
  }
2340
  taosHashCleanup(pCreateTbHash);
257✔
2341
  tDecoderClear(&decoder);
257✔
2342
  qDestroyQuery(pQuery);
257✔
2343
  destroyRequest(pRequest);
257✔
2344
  return code;
257✔
2345
}
2346

UNCOV
2347
static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
×
2348
  if (taos == NULL || data == NULL) {
×
2349
    uError("invalid parameter in %s", __func__);
×
2350
    return TSDB_CODE_INVALID_PARA;
×
2351
  }
UNCOV
2352
  int32_t   code = TSDB_CODE_SUCCESS;
×
2353
  SQuery*   pQuery = NULL;
×
2354
  SHashObj* pVgroupHash = NULL;
×
2355
  SMqRspObj rspObj = {0};
×
2356
  SDecoder  decoder = {0};
×
2357

UNCOV
2358
  SRequestObj*     pRequest = NULL;
×
2359
  SCatalog*        pCatalog = NULL;
×
2360
  SRequestConnInfo conn = {0};
×
2361

UNCOV
2362
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
×
2363
  uDebug(LOG_ID_TAG " write raw rawdata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
×
2364
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
×
2365

UNCOV
2366
  SHashObj* pVgHash = NULL;
×
2367
  SHashObj* pNameHash = NULL;
×
2368
  SHashObj* pMetaHash = NULL;
×
2369
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
×
2370
  int retry = 0;
×
2371
  while (1) {
×
2372
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
×
2373
    uDebug(LOG_ID_TAG " write raw rawdata block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
×
2374
    SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(pQuery)->pRoot;
×
2375
    pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
×
2376
    RAW_NULL_CHECK(pVgroupHash);
×
2377
    pStmt->pVgDataBlocks = taosArrayInit(8, POINTER_BYTES);
×
2378
    RAW_NULL_CHECK(pStmt->pVgDataBlocks);
×
2379

UNCOV
2380
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
×
2381
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
×
2382
      RAW_NULL_CHECK(tbName);
×
2383
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
×
2384
      RAW_NULL_CHECK(pRetrieve);
×
2385
      void* rawData = getRawDataFromRes(pRetrieve);
×
2386
      RAW_NULL_CHECK(rawData);
×
2387

UNCOV
2388
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
×
2389
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
×
2390
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
×
2391
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
×
2392

2393
      // find schema data info
UNCOV
2394
      STableMeta* pTableMeta = NULL;
×
2395
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, &pTableMeta, NULL,
×
2396
                                        NULL, retry));
UNCOV
2397
      char err[ERR_MSG_LEN] = {0};
×
2398
      code = rawBlockBindRawData(pVgroupHash, pStmt->pVgDataBlocks, pTableMeta, rawData);
×
2399
      if (code != TSDB_CODE_SUCCESS) {
×
2400
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
2401
        goto end;
×
2402
      }
2403
    }
UNCOV
2404
    taosHashCleanup(pVgroupHash);
×
2405
    pVgroupHash = NULL;
×
2406

UNCOV
2407
    RAW_RETURN_CHECK(smlBuildOutputRaw(pQuery, pVgHash));
×
2408
    launchQueryImpl(pRequest, pQuery, true, NULL);
×
2409
    code = pRequest->code;
×
2410

UNCOV
2411
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
×
2412
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
2413
      qDestroyQuery(pQuery);
×
2414
      pQuery = NULL;
×
2415
      rspObj.resIter = -1;
×
2416
      continue;
×
2417
    }
UNCOV
2418
    break;
×
2419
  }
2420

UNCOV
2421
end:
×
2422
  uDebug(LOG_ID_TAG " write raw rawdata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
×
2423
  tDeleteMqDataRsp(&rspObj.dataRsp);
×
2424
  tDecoderClear(&decoder);
×
2425
  qDestroyQuery(pQuery);
×
2426
  taosHashCleanup(pVgroupHash);
×
2427
  destroyRequest(pRequest);
×
2428
  return code;
×
2429
}
2430

2431
static void processSimpleMeta(SMqMetaRsp* pMetaRsp, cJSON** meta) {
13,821✔
2432
  if (pMetaRsp == NULL || meta == NULL) {
13,821✔
UNCOV
2433
    uError("invalid parameter in %s", __func__);
×
2434
    return;
×
2435
  }
2436
  if (pMetaRsp->resMsgType == TDMT_VND_CREATE_STB) {
13,821✔
2437
    processCreateStb(pMetaRsp, meta);
3,593✔
2438
  } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_STB) {
10,228✔
2439
    processAlterStb(pMetaRsp, meta);
2,105✔
2440
  } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_STB) {
8,123✔
2441
    processDropSTable(pMetaRsp, meta);
384✔
2442
  } else if (pMetaRsp->resMsgType == TDMT_VND_CREATE_TABLE) {
7,739✔
2443
    processCreateTable(pMetaRsp, meta);
5,863✔
2444
  } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_TABLE) {
1,876✔
2445
    processAlterTable(pMetaRsp, meta);
1,453✔
2446
  } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_TABLE) {
423✔
2447
    processDropTable(pMetaRsp, meta);
261✔
2448
  } else if (pMetaRsp->resMsgType == TDMT_VND_DELETE) {
162✔
2449
    processDeleteTable(pMetaRsp, meta);
162✔
2450
  }
2451
}
2452

2453
static void processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp, char** string) {
706✔
2454
  if (pMsgRsp == NULL || string == NULL) {
706✔
UNCOV
2455
    uError("invalid parameter in %s", __func__);
×
2456
    return;
×
2457
  }
2458
  SDecoder        coder = {0};
706✔
2459
  SMqBatchMetaRsp rsp = {0};
706✔
2460
  int32_t         code = 0;
706✔
2461
  cJSON*          pJson = NULL;
706✔
2462
  tDecoderInit(&coder, pMsgRsp->pMetaBuff, pMsgRsp->metaBuffLen);
706✔
2463
  if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) {
706✔
UNCOV
2464
    goto end;
×
2465
  }
2466

2467
  pJson = cJSON_CreateObject();
706✔
2468
  RAW_NULL_CHECK(pJson);
706✔
2469
  RAW_FALSE_CHECK(cJSON_AddStringToObject(pJson, "tmq_meta_version", TMQ_META_VERSION));
706✔
2470
  cJSON* pMetaArr = cJSON_CreateArray();
706✔
2471
  RAW_NULL_CHECK(pMetaArr);
706✔
2472
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(pJson, "metas", pMetaArr));
706✔
2473

2474
  int32_t num = taosArrayGetSize(rsp.batchMetaReq);
706✔
2475
  for (int32_t i = 0; i < num; i++) {
5,780✔
2476
    int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
5,074✔
2477
    RAW_NULL_CHECK(len);
5,074✔
2478
    void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
5,074✔
2479
    RAW_NULL_CHECK(tmpBuf);
5,074✔
2480
    SDecoder   metaCoder = {0};
5,074✔
2481
    SMqMetaRsp metaRsp = {0};
5,074✔
2482
    tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead));
5,074✔
2483
    if (tDecodeMqMetaRsp(&metaCoder, &metaRsp) < 0) {
5,074✔
UNCOV
2484
      goto end;
×
2485
    }
2486
    cJSON* pItem = NULL;
5,074✔
2487
    processSimpleMeta(&metaRsp, &pItem);
5,074✔
2488
    tDeleteMqMetaRsp(&metaRsp);
5,074✔
2489
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(pMetaArr, pItem));
5,074✔
2490
  }
2491

2492
  tDeleteMqBatchMetaRsp(&rsp);
706✔
2493
  char* fullStr = cJSON_PrintUnformatted(pJson);
706✔
2494
  cJSON_Delete(pJson);
706✔
2495
  *string = fullStr;
706✔
2496
  return;
706✔
2497

UNCOV
2498
end:
×
2499
  cJSON_Delete(pJson);
×
2500
  tDeleteMqBatchMetaRsp(&rsp);
×
2501
}
2502

2503
char* tmq_get_json_meta(TAOS_RES* res) {
9,847✔
2504
  if (res == NULL) {
9,847✔
UNCOV
2505
    uError("invalid parameter in %s", __func__);
×
2506
    return NULL;
×
2507
  }
2508
  uDebug("tmq_get_json_meta res:%p", res);
9,847✔
2509
  if (!TD_RES_TMQ_META(res) && !TD_RES_TMQ_METADATA(res) && !TD_RES_TMQ_BATCH_META(res)) {
9,847✔
UNCOV
2510
    return NULL;
×
2511
  }
2512

2513
  char*      string = NULL;
9,847✔
2514
  SMqRspObj* rspObj = (SMqRspObj*)res;
9,847✔
2515
  if (TD_RES_TMQ_METADATA(res)) {
9,847✔
2516
    processAutoCreateTable(&rspObj->dataRsp, &string);
394✔
2517
  } else if (TD_RES_TMQ_BATCH_META(res)) {
9,453✔
2518
    processBatchMetaToJson(&rspObj->batchMetaRsp, &string);
706✔
2519
  } else if (TD_RES_TMQ_META(res)) {
8,747✔
2520
    cJSON* pJson = NULL;
8,747✔
2521
    processSimpleMeta(&rspObj->metaRsp, &pJson);
8,747✔
2522
    string = cJSON_PrintUnformatted(pJson);
8,747✔
2523
    cJSON_Delete(pJson);
8,747✔
2524
  } else {
UNCOV
2525
    uError("tmq_get_json_meta res:%d, invalid type", *(int8_t*)res);
×
2526
  }
2527

2528
  uDebug("tmq_get_json_meta string:%s", string);
9,847✔
2529
  return string;
9,847✔
2530
}
2531

2532
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
11,736✔
2533

2534
static int32_t getOffSetLen(const SMqDataRsp* pRsp) {
2,141✔
2535
  if (pRsp == NULL) {
2,141✔
UNCOV
2536
    uError("invalid parameter in %s", __func__);
×
2537
    return TSDB_CODE_INVALID_PARA;
×
2538
  }
2539
  SEncoder coder = {0};
2,141✔
2540
  tEncoderInit(&coder, NULL, 0);
2,141✔
2541
  if (tEncodeSTqOffsetVal(&coder, &pRsp->reqOffset) < 0) return -1;
2,141✔
2542
  if (tEncodeSTqOffsetVal(&coder, &pRsp->rspOffset) < 0) return -1;
2,141✔
2543
  int32_t pos = coder.pos;
2,141✔
2544
  tEncoderClear(&coder);
2,141✔
2545
  return pos;
2,141✔
2546
}
2547

2548
typedef int32_t __encode_func__(SEncoder* pEncoder, const SMqDataRsp* pRsp);
2549
static int32_t  encodeMqDataRsp(__encode_func__* encodeFunc, SMqDataRsp* rspObj, tmq_raw_data* raw) {
2,141✔
2550
  if (raw == NULL || encodeFunc == NULL || rspObj == NULL) {
2,141✔
UNCOV
2551
    uError("invalid parameter in %s", __func__);
×
2552
    return TSDB_CODE_INVALID_PARA;
×
2553
  }
2554
  uint32_t len = 0;
2,141✔
2555
  int32_t  code = 0;
2,141✔
2556
  SEncoder encoder = {0};
2,141✔
2557
  void*    buf = NULL;
2,141✔
2558
  tEncodeSize(encodeFunc, rspObj, len, code);
2,141✔
2559
  if (code < 0) {
2,141✔
UNCOV
2560
    code = TSDB_CODE_INVALID_MSG;
×
2561
    goto FAILED;
×
2562
  }
2563
  len += sizeof(int8_t) + sizeof(int32_t);
2,141✔
2564
  buf = taosMemoryCalloc(1, len);
2,141✔
2565
  if (buf == NULL) {
2,141✔
UNCOV
2566
    code = terrno;
×
2567
    goto FAILED;
×
2568
  }
2569
  tEncoderInit(&encoder, buf, len);
2,141✔
2570
  if (tEncodeI8(&encoder, MQ_DATA_RSP_VERSION) < 0) {
2,141✔
UNCOV
2571
    code = TSDB_CODE_INVALID_MSG;
×
2572
    goto FAILED;
×
2573
  }
2574
  int32_t offsetLen = getOffSetLen(rspObj);
2,141✔
2575
  if (offsetLen <= 0) {
2,141✔
UNCOV
2576
    code = TSDB_CODE_INVALID_MSG;
×
2577
    goto FAILED;
×
2578
  }
2579
  if (tEncodeI32(&encoder, offsetLen) < 0) {
2,141✔
UNCOV
2580
    code = TSDB_CODE_INVALID_MSG;
×
2581
    goto FAILED;
×
2582
  }
2583
  if (encodeFunc(&encoder, rspObj) < 0) {
2,141✔
UNCOV
2584
    code = TSDB_CODE_INVALID_MSG;
×
2585
    goto FAILED;
×
2586
  }
2587
  tEncoderClear(&encoder);
2,141✔
2588

2589
  raw->raw = buf;
2,141✔
2590
  raw->raw_len = len;
2,141✔
2591
  return code;
2,141✔
UNCOV
2592
FAILED:
×
2593
  tEncoderClear(&encoder);
×
2594
  taosMemoryFree(buf);
×
2595
  return code;
×
2596
}
2597

2598
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
12,332✔
2599
  if (raw == NULL || res == NULL) {
12,332✔
UNCOV
2600
    uError("invalid parameter in %s", __func__);
×
2601
    return TSDB_CODE_INVALID_PARA;
×
2602
  }
2603
  *raw = (tmq_raw_data){0};
12,332✔
2604
  SMqRspObj* rspObj = ((SMqRspObj*)res);
12,332✔
2605
  if (TD_RES_TMQ_META(res)) {
12,332✔
2606
    raw->raw = rspObj->metaRsp.metaRsp;
9,485✔
2607
    raw->raw_len = rspObj->metaRsp.metaRspLen >= 0 ? rspObj->metaRsp.metaRspLen : 0;
9,485✔
2608
    raw->raw_type = rspObj->metaRsp.resMsgType;
9,485✔
2609
    uDebug("tmq get raw type meta:%p", raw);
9,485✔
2610
  } else if (TD_RES_TMQ(res)) {
2,847✔
2611
    int32_t code = encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->dataRsp, raw);
1,884✔
2612
    if (code != 0) {
1,884✔
UNCOV
2613
      uError("tmq get raw type error:%d", terrno);
×
2614
      return code;
×
2615
    }
2616
    raw->raw_type = RES_TYPE__TMQ;
1,884✔
2617
    uDebug("tmq get raw type data:%p", raw);
1,884✔
2618
  } else if (TD_RES_TMQ_METADATA(res)) {
963✔
2619
    int32_t code = encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->dataRsp, raw);
257✔
2620
    if (code != 0) {
257✔
UNCOV
2621
      uError("tmq get raw type error:%d", terrno);
×
2622
      return code;
×
2623
    }
2624
    raw->raw_type = RES_TYPE__TMQ_METADATA;
257✔
2625
    uDebug("tmq get raw type metadata:%p", raw);
257✔
2626
  } else if (TD_RES_TMQ_BATCH_META(res)) {
706✔
2627
    raw->raw = rspObj->batchMetaRsp.pMetaBuff;
706✔
2628
    raw->raw_len = rspObj->batchMetaRsp.metaBuffLen;
706✔
2629
    raw->raw_type = rspObj->resType;
706✔
2630
    uDebug("tmq get raw batch meta:%p", raw);
706✔
UNCOV
2631
  } else if (TD_RES_TMQ_RAW(res)) {
×
2632
    raw->raw = rspObj->dataRsp.rawData;
×
2633
    rspObj->dataRsp.rawData = NULL;
×
2634
    raw->raw_len = rspObj->dataRsp.len;
×
2635
    raw->raw_type = rspObj->resType;
×
2636
    uDebug("tmq get raw raw:%p", raw);
×
2637
  } else {
UNCOV
2638
    uError("tmq get raw error type:%d", *(int8_t*)res);
×
2639
    return TSDB_CODE_TMQ_INVALID_MSG;
×
2640
  }
2641
  return TSDB_CODE_SUCCESS;
12,332✔
2642
}
2643

2644
void tmq_free_raw(tmq_raw_data raw) {
12,332✔
2645
  uDebug("tmq free raw data type:%d", raw.raw_type);
12,332✔
2646
  if (raw.raw_type == RES_TYPE__TMQ || raw.raw_type == RES_TYPE__TMQ_METADATA) {
12,332✔
2647
    taosMemoryFree(raw.raw);
2,141✔
2648
  } else if (raw.raw_type == RES_TYPE__TMQ_RAWDATA && raw.raw != NULL) {
10,191✔
UNCOV
2649
    taosMemoryFree(POINTER_SHIFT(raw.raw, -sizeof(SMqRspHead)));
×
2650
  }
2651
  (void)memset(terrMsg, 0, ERR_MSG_LEN);
12,332✔
2652
}
12,332✔
2653

2654
static int32_t writeRawInit() {
16,269✔
2655
  while (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_START) {
16,864✔
2656
    int8_t old = atomic_val_compare_exchange_8(&initFlag, 0, 1);
595✔
2657
    if (old == 0) {
595✔
2658
      int32_t code = initRawCacheHash();
595✔
2659
      if (code != 0) {
595✔
UNCOV
2660
        uError("tmq writeRawImpl init error:%d", code);
×
2661
        atomic_store_8(&initedFlag, WRITE_RAW_INIT_FAIL);
×
2662
        return code;
×
2663
      }
2664
      atomic_store_8(&initedFlag, WRITE_RAW_INIT_OK);
595✔
2665
    }
2666
  }
2667

2668
  if (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_FAIL) {
16,269✔
UNCOV
2669
    return TSDB_CODE_INTERNAL_ERROR;
×
2670
  }
2671
  return 0;
16,269✔
2672
}
2673

2674
static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) {
16,269✔
2675
  if (taos == NULL || buf == NULL) {
16,269✔
UNCOV
2676
    uError("invalid parameter in %s", __func__);
×
2677
    return TSDB_CODE_INVALID_PARA;
×
2678
  }
2679
  if (writeRawInit() != 0) {
16,269✔
UNCOV
2680
    return TSDB_CODE_INTERNAL_ERROR;
×
2681
  }
2682

2683
  if (type == TDMT_VND_CREATE_STB) {
16,269✔
2684
    return taosCreateStb(taos, buf, len);
3,462✔
2685
  } else if (type == TDMT_VND_ALTER_STB) {
12,807✔
2686
    return taosCreateStb(taos, buf, len);
2,105✔
2687
  } else if (type == TDMT_VND_DROP_STB) {
10,702✔
2688
    return taosDropStb(taos, buf, len);
384✔
2689
  } else if (type == TDMT_VND_CREATE_TABLE) {
10,318✔
2690
    return taosCreateTable(taos, buf, len);
5,662✔
2691
  } else if (type == TDMT_VND_ALTER_TABLE) {
4,656✔
2692
    return taosAlterTable(taos, buf, len);
1,386✔
2693
  } else if (type == TDMT_VND_DROP_TABLE) {
3,270✔
2694
    return taosDropTable(taos, buf, len);
261✔
2695
  } else if (type == TDMT_VND_DELETE) {
3,009✔
2696
    return taosDeleteData(taos, buf, len);
162✔
2697
  } else if (type == RES_TYPE__TMQ_METADATA) {
2,847✔
2698
    return tmqWriteRawMetaDataImpl(taos, buf, len);
257✔
2699
  } else if (type == RES_TYPE__TMQ_RAWDATA) {
2,590✔
UNCOV
2700
    return tmqWriteRawRawDataImpl(taos, buf, len);
×
2701
  } else if (type == RES_TYPE__TMQ) {
2,590✔
2702
    return tmqWriteRawDataImpl(taos, buf, len);
1,884✔
2703
  } else if (type == RES_TYPE__TMQ_BATCH_META) {
706✔
2704
    return tmqWriteBatchMetaDataImpl(taos, buf, len);
706✔
2705
  }
UNCOV
2706
  return TSDB_CODE_INVALID_PARA;
×
2707
}
2708

2709
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
11,791✔
2710
  if (taos == NULL || raw.raw == NULL || raw.raw_len <= 0) {
11,791✔
2711
    SET_ERROR_MSG("taos:%p or data:%p is NULL or raw_len <= 0", taos, raw.raw);
596✔
2712
    return TSDB_CODE_INVALID_PARA;
596✔
2713
  }
2714
  taosClearErrMsg();  // clear global error message
11,195✔
2715

2716
  return writeRawImpl(taos, raw.raw, raw.raw_len, raw.raw_type);
11,195✔
2717
}
2718

2719
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen) {
706✔
2720
  if (taos == NULL || meta == NULL) {
706✔
UNCOV
2721
    uError("invalid parameter in %s", __func__);
×
2722
    return TSDB_CODE_INVALID_PARA;
×
2723
  }
2724
  SMqBatchMetaRsp rsp = {0};
706✔
2725
  SDecoder        coder = {0};
706✔
2726
  int32_t         code = TSDB_CODE_SUCCESS;
706✔
2727

2728
  // decode and process req
2729
  tDecoderInit(&coder, meta, metaLen);
706✔
2730
  if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) {
706✔
UNCOV
2731
    code = TSDB_CODE_INVALID_PARA;
×
2732
    goto end;
×
2733
  }
2734
  int32_t num = taosArrayGetSize(rsp.batchMetaReq);
706✔
2735
  for (int32_t i = 0; i < num; i++) {
5,780✔
2736
    int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
5,074✔
2737
    RAW_NULL_CHECK(len);
5,074✔
2738
    void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
5,074✔
2739
    RAW_NULL_CHECK(tmpBuf);
5,074✔
2740
    SDecoder   metaCoder = {0};
5,074✔
2741
    SMqMetaRsp metaRsp = {0};
5,074✔
2742
    tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead));
5,074✔
2743
    if (tDecodeMqMetaRsp(&metaCoder, &metaRsp) < 0) {
5,074✔
UNCOV
2744
      code = TSDB_CODE_INVALID_PARA;
×
2745
      goto end;
×
2746
    }
2747
    code = writeRawImpl(taos, metaRsp.metaRsp, metaRsp.metaRspLen, metaRsp.resMsgType);
5,074✔
2748
    tDeleteMqMetaRsp(&metaRsp);
5,074✔
2749
    if (code != TSDB_CODE_SUCCESS) {
5,074✔
UNCOV
2750
      goto end;
×
2751
    }
2752
  }
2753

2754
end:
706✔
2755
  tDeleteMqBatchMetaRsp(&rsp);
706✔
2756
  return code;
706✔
2757
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc