• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3552

11 Dec 2024 06:08AM UTC coverage: 62.526% (+0.7%) from 61.798%
#3552

push

travis-ci

web-flow
Merge pull request #29092 from taosdata/fix/3.0/TD-33146

fix:[TD-33146] stmt_get_tag_fields return error code

124833 of 255773 branches covered (48.81%)

Branch coverage included in aggregate %.

209830 of 279467 relevant lines covered (75.08%)

19111707.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.58
/source/client/src/clientRawBlockWrite.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "parser.h"
19
#include "tcol.h"
20
#include "tcompression.h"
21
#include "tdatablock.h"
22
#include "tdef.h"
23
#include "tglobal.h"
24
#include "tmsgtype.h"
25

26
#define RAW_NULL_CHECK(c) \
27
  do {                    \
28
    if (c == NULL) {      \
29
      code = terrno;      \
30
      goto end;           \
31
    }                     \
32
  } while (0)
33

34
#define RAW_FALSE_CHECK(c)           \
35
  do {                               \
36
    if (!c) {                        \
37
      code = TSDB_CODE_INVALID_PARA; \
38
      goto end;                      \
39
    }                                \
40
  } while (0)
41

42
#define RAW_RETURN_CHECK(c) \
43
  do {                      \
44
    code = c;               \
45
    if (code != 0) {        \
46
      goto end;             \
47
    }                       \
48
  } while (0)
49

50
#define LOG_ID_TAG   "connId:0x%" PRIx64 ",QID:0x%" PRIx64
51
#define LOG_ID_VALUE *(int64_t*)taos, pRequest->requestId
52

53
#define TMQ_META_VERSION "1.0"
54

55
static bool  tmqAddJsonObjectItem(cJSON *object, const char *string, cJSON *item){
6,377✔
56
  bool ret = cJSON_AddItemToObject(object, string, item);
6,377✔
57
  if (!ret){
6,377!
58
    cJSON_Delete(item);
×
59
  }
60
  return ret;
6,377✔
61
}
62
static bool  tmqAddJsonArrayItem(cJSON *array, cJSON *item){
1,189✔
63
  bool ret = cJSON_AddItemToArray(array, item);
1,189✔
64
  if (!ret){
1,189!
65
    cJSON_Delete(item);
×
66
  }
67
  return ret;
1,189✔
68
}
69

70

71
static int32_t  tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen);
72
static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); }
118✔
73
static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t,
96✔
74
                                 SColCmprWrapper* pColCmprRow, cJSON** pJson) {
75
  int32_t code = TSDB_CODE_SUCCESS;
96✔
76
  int8_t  buildDefaultCompress = 0;
96✔
77
  if (pColCmprRow->nCols <= 0) {
96!
78
    buildDefaultCompress = 1;
×
79
  }
80

81
  char*  string = NULL;
96✔
82
  cJSON* json = cJSON_CreateObject();
96✔
83
  RAW_NULL_CHECK(json);
96!
84
  cJSON* type = cJSON_CreateString("create");
96✔
85
  RAW_NULL_CHECK(type);
96!
86

87
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
96!
88
  cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super");
96✔
89
  RAW_NULL_CHECK(tableType);
96!
90
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
96!
91
  cJSON* tableName = cJSON_CreateString(name);
96✔
92
  RAW_NULL_CHECK(tableName);
96!
93
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
96!
94

95
  cJSON* columns = cJSON_CreateArray();
96✔
96
  RAW_NULL_CHECK(columns);
96!
97
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "columns", columns));
96!
98

99
  for (int i = 0; i < schemaRow->nCols; i++) {
584✔
100
    cJSON* column = cJSON_CreateObject();
488✔
101
    RAW_NULL_CHECK(column);
488!
102
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(columns, column));
488!
103
    SSchema* s = schemaRow->pSchema + i;
488✔
104
    cJSON*   cname = cJSON_CreateString(s->name);
488✔
105
    RAW_NULL_CHECK(cname);
488!
106
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "name", cname));
488!
107
    cJSON* ctype = cJSON_CreateNumber(s->type);
488✔
108
    RAW_NULL_CHECK(ctype);
488!
109
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "type", ctype));
488!
110
    if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) {
546!
111
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
58✔
112
      cJSON*  cbytes = cJSON_CreateNumber(length);
58✔
113
      RAW_NULL_CHECK(cbytes);
58!
114
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "length", cbytes));
58!
115
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
430✔
116
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
8✔
117
      cJSON*  cbytes = cJSON_CreateNumber(length);
8✔
118
      RAW_NULL_CHECK(cbytes);
8!
119
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "length", cbytes));
8!
120
    }
121
    cJSON* isPk = cJSON_CreateBool(s->flags & COL_IS_KEY);
488✔
122
    RAW_NULL_CHECK(isPk);
488!
123
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "isPrimarykey", isPk));
488!
124

125
    if (pColCmprRow == NULL) {
488!
126
      continue;
×
127
    }
128

129
    uint32_t alg = 0;
488✔
130
    if (buildDefaultCompress) {
488!
131
      alg = createDefaultColCmprByType(s->type);
×
132
    } else {
133
      SColCmpr* pColCmpr = pColCmprRow->pColCmpr + i;
488✔
134
      alg = pColCmpr->alg;
488✔
135
    }
136
    const char* encode = columnEncodeStr(COMPRESS_L1_TYPE_U32(alg));
488✔
137
    RAW_NULL_CHECK(encode);
488!
138
    const char* compress = columnCompressStr(COMPRESS_L2_TYPE_U32(alg));
488✔
139
    RAW_NULL_CHECK(compress);
488!
140
    const char* level = columnLevelStr(COMPRESS_L2_TYPE_LEVEL_U32(alg));
488✔
141
    RAW_NULL_CHECK(level);
488!
142

143
    cJSON* encodeJson = cJSON_CreateString(encode);
488✔
144
    RAW_NULL_CHECK(encodeJson);
488!
145
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "encode", encodeJson));
488!
146

147
    cJSON* compressJson = cJSON_CreateString(compress);
488✔
148
    RAW_NULL_CHECK(compressJson);
488!
149
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "compress", compressJson));
488!
150

151
    cJSON* levelJson = cJSON_CreateString(level);
488✔
152
    RAW_NULL_CHECK(levelJson);
488!
153
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "level", levelJson));
488!
154
  }
155

156
  cJSON* tags = cJSON_CreateArray();
96✔
157
  RAW_NULL_CHECK(tags);
96!
158
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags));
96!
159

160
  for (int i = 0; schemaTag && i < schemaTag->nCols; i++) {
290✔
161
    cJSON* tag = cJSON_CreateObject();
194✔
162
    RAW_NULL_CHECK(tag);
194!
163
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag));
194!
164
    SSchema* s = schemaTag->pSchema + i;
194✔
165
    cJSON*   tname = cJSON_CreateString(s->name);
194✔
166
    RAW_NULL_CHECK(tname);
194!
167
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname));
194!
168
    cJSON* ttype = cJSON_CreateNumber(s->type);
194✔
169
    RAW_NULL_CHECK(ttype);
194!
170
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype));
194!
171
    if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) {
202!
172
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
8✔
173
      cJSON*  cbytes = cJSON_CreateNumber(length);
8✔
174
      RAW_NULL_CHECK(cbytes);
8!
175
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "length", cbytes));
8!
176
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
186✔
177
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
55✔
178
      cJSON*  cbytes = cJSON_CreateNumber(length);
55✔
179
      RAW_NULL_CHECK(cbytes);
55!
180
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "length", cbytes));
55!
181
    }
182
  }
183

184
end:
96✔
185
  *pJson = json;
96✔
186
}
96✔
187

188
static int32_t setCompressOption(cJSON* json, uint32_t para) {
×
189
  uint8_t encode = COMPRESS_L1_TYPE_U32(para);
×
190
  int32_t code = 0;
×
191
  if (encode != 0) {
×
192
    const char* encodeStr = columnEncodeStr(encode);
×
193
    RAW_NULL_CHECK(encodeStr);
×
194
    cJSON* encodeJson = cJSON_CreateString(encodeStr);
×
195
    RAW_NULL_CHECK(encodeJson);
×
196
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "encode", encodeJson));
×
197
    return code;
×
198
  }
199
  uint8_t compress = COMPRESS_L2_TYPE_U32(para);
×
200
  if (compress != 0) {
×
201
    const char* compressStr = columnCompressStr(compress);
×
202
    RAW_NULL_CHECK(compressStr);
×
203
    cJSON* compressJson = cJSON_CreateString(compressStr);
×
204
    RAW_NULL_CHECK(compressJson);
×
205
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "compress", compressJson));
×
206
    return code;
×
207
  }
208
  uint8_t level = COMPRESS_L2_TYPE_LEVEL_U32(para);
×
209
  if (level != 0) {
×
210
    const char* levelStr = columnLevelStr(level);
×
211
    RAW_NULL_CHECK(levelStr);
×
212
    cJSON* levelJson = cJSON_CreateString(levelStr);
×
213
    RAW_NULL_CHECK(levelJson);
×
214
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "level", levelJson));
×
215
    return code;
×
216
  }
217

218
end:
×
219
  return code;
×
220
}
221
static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON** pJson) {
40✔
222
  SMAlterStbReq req = {0};
40✔
223
  cJSON*        json = NULL;
40✔
224
  char*         string = NULL;
40✔
225
  int32_t       code = 0;
40✔
226

227
  if (tDeserializeSMAlterStbReq(alterData, alterDataLen, &req) != 0) {
40!
228
    goto end;
×
229
  }
230

231
  json = cJSON_CreateObject();
40✔
232
  RAW_NULL_CHECK(json);
40!
233
  cJSON* type = cJSON_CreateString("alter");
40✔
234
  RAW_NULL_CHECK(type);
40!
235
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
40!
236
  SName name = {0};
40✔
237
  RAW_RETURN_CHECK(tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
40!
238
  cJSON* tableType = cJSON_CreateString("super");
40✔
239
  RAW_NULL_CHECK(tableType);
40!
240
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
40!
241
  cJSON* tableName = cJSON_CreateString(name.tname);
40✔
242
  RAW_NULL_CHECK(tableName);
40!
243
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
40!
244

245
  cJSON* alterType = cJSON_CreateNumber(req.alterType);
40✔
246
  RAW_NULL_CHECK(alterType);
40!
247
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "alterType", alterType));
40!
248
  switch (req.alterType) {
40!
249
    case TSDB_ALTER_TABLE_ADD_TAG:
24✔
250
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
251
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
24✔
252
      RAW_NULL_CHECK(field);
24!
253
      cJSON* colName = cJSON_CreateString(field->name);
24✔
254
      RAW_NULL_CHECK(colName);
24!
255
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
24!
256
      cJSON* colType = cJSON_CreateNumber(field->type);
24✔
257
      RAW_NULL_CHECK(colType);
24!
258
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
24!
259

260
      if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
24!
261
          field->type == TSDB_DATA_TYPE_GEOMETRY) {
24!
262
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
8✔
263
        cJSON*  cbytes = cJSON_CreateNumber(length);
8✔
264
        RAW_NULL_CHECK(cbytes);
8!
265
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
8!
266
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
16!
267
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
268
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
269
        RAW_NULL_CHECK(cbytes);
×
270
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
271
      }
272
      break;
24✔
273
    }
274
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: {
×
275
      SFieldWithOptions* field = taosArrayGet(req.pFields, 0);
×
276
      RAW_NULL_CHECK(field);
×
277
      cJSON* colName = cJSON_CreateString(field->name);
×
278
      RAW_NULL_CHECK(colName);
×
279
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
280
      cJSON* colType = cJSON_CreateNumber(field->type);
×
281
      RAW_NULL_CHECK(colType);
×
282
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
×
283

284
      if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
×
285
          field->type == TSDB_DATA_TYPE_GEOMETRY) {
×
286
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
×
287
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
288
        RAW_NULL_CHECK(cbytes);
×
289
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
290
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
×
291
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
292
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
293
        RAW_NULL_CHECK(cbytes);
×
294
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
295
      }
296
      RAW_RETURN_CHECK(setCompressOption(json, field->compress));
×
297
      break;
×
298
    }
299
    case TSDB_ALTER_TABLE_DROP_TAG:
8✔
300
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
301
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
8✔
302
      RAW_NULL_CHECK(field);
8!
303
      cJSON* colName = cJSON_CreateString(field->name);
8✔
304
      RAW_NULL_CHECK(colName);
8!
305
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
8!
306
      break;
8✔
307
    }
308
    case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
8✔
309
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
310
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
8✔
311
      RAW_NULL_CHECK(field);
8!
312
      cJSON* colName = cJSON_CreateString(field->name);
8✔
313
      RAW_NULL_CHECK(colName);
8!
314
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
8!
315
      cJSON* colType = cJSON_CreateNumber(field->type);
8✔
316
      RAW_NULL_CHECK(colType);
8!
317
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
8!
318
      if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
8!
319
          field->type == TSDB_DATA_TYPE_GEOMETRY) {
8!
320
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
8✔
321
        cJSON*  cbytes = cJSON_CreateNumber(length);
8✔
322
        RAW_NULL_CHECK(cbytes);
8!
323
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
8!
324
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
×
325
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
326
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
327
        RAW_NULL_CHECK(cbytes);
×
328
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
329
      }
330
      break;
8✔
331
    }
332
    case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
×
333
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
334
      TAOS_FIELD* oldField = taosArrayGet(req.pFields, 0);
×
335
      RAW_NULL_CHECK(oldField);
×
336
      TAOS_FIELD* newField = taosArrayGet(req.pFields, 1);
×
337
      RAW_NULL_CHECK(newField);
×
338
      cJSON* colName = cJSON_CreateString(oldField->name);
×
339
      RAW_NULL_CHECK(colName);
×
340
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
341
      cJSON* colNewName = cJSON_CreateString(newField->name);
×
342
      RAW_NULL_CHECK(colNewName);
×
343
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colNewName", colNewName));
×
344
      break;
×
345
    }
346
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
×
347
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
×
348
      RAW_NULL_CHECK(field);
×
349
      cJSON* colName = cJSON_CreateString(field->name);
×
350
      RAW_NULL_CHECK(colName);
×
351
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
352
      RAW_RETURN_CHECK(setCompressOption(json, field->bytes));
×
353
      break;
×
354
    }
355
    default:
×
356
      break;
×
357
  }
358

359
end:
40✔
360
  tFreeSMAltertbReq(&req);
40✔
361
  *pJson = json;
40✔
362
}
40✔
363

364
static void processCreateStb(SMqMetaRsp* metaRsp, cJSON** pJson) {
78✔
365
  SVCreateStbReq req = {0};
78✔
366
  SDecoder       coder;
367

368
  uDebug("create stable data:%p", metaRsp);
78!
369
  // decode and process req
370
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
78✔
371
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
78✔
372
  tDecoderInit(&coder, data, len);
78✔
373

374
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
78!
375
    goto end;
×
376
  }
377
  buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE, &req.colCmpr, pJson);
78✔
378

379
end:
78✔
380
  uDebug("create stable return");
78!
381
  tDecoderClear(&coder);
78✔
382
}
78✔
383

384
static void processAlterStb(SMqMetaRsp* metaRsp, cJSON** pJson) {
40✔
385
  SVCreateStbReq req = {0};
40✔
386
  SDecoder       coder = {0};
40✔
387
  uDebug("alter stable data:%p", metaRsp);
40!
388

389
  // decode and process req
390
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
40✔
391
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
40✔
392
  tDecoderInit(&coder, data, len);
40✔
393

394
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
40!
395
    goto end;
×
396
  }
397
  buildAlterSTableJson(req.alterOriData, req.alterOriDataLen, pJson);
40✔
398

399
end:
40✔
400
  uDebug("alter stable return");
40!
401
  tDecoderClear(&coder);
40✔
402
}
40✔
403

404
static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
156✔
405
  STag*   pTag = (STag*)pCreateReq->ctb.pTag;
156✔
406
  char*   sname = pCreateReq->ctb.stbName;
156✔
407
  char*   name = pCreateReq->name;
156✔
408
  SArray* tagName = pCreateReq->ctb.tagName;
156✔
409
  int64_t id = pCreateReq->uid;
156✔
410
  uint8_t tagNum = pCreateReq->ctb.tagNum;
156✔
411
  int32_t code = 0;
156✔
412
  SArray* pTagVals = NULL;
156✔
413
  char*   pJson = NULL;
156✔
414

415
  cJSON*  tableName = cJSON_CreateString(name);
156✔
416
  RAW_NULL_CHECK(tableName);
156!
417
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
156!
418
  cJSON* using = cJSON_CreateString(sname);
156✔
419
  RAW_NULL_CHECK(using);
156!
420
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "using", using));
156!
421
  cJSON* tagNumJson = cJSON_CreateNumber(tagNum);
156✔
422
  RAW_NULL_CHECK(tagNumJson);
156!
423
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tagNum", tagNumJson));
156!
424

425
  cJSON* tags = cJSON_CreateArray();
156✔
426
  RAW_NULL_CHECK(tags);
156!
427
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags));
156!
428
  RAW_RETURN_CHECK(tTagToValArray(pTag, &pTagVals));
156!
429
  if (tTagIsJson(pTag)) {
156✔
430
    STag* p = (STag*)pTag;
20✔
431
    if (p->nTag == 0) {
20✔
432
      uError("p->nTag == 0");
10!
433
      goto end;
10✔
434
    }
435
    parseTagDatatoJson(pTag, &pJson, NULL);
10✔
436
    RAW_NULL_CHECK(pJson);
10!
437
    cJSON* tag = cJSON_CreateObject();
10✔
438
    RAW_NULL_CHECK(tag);
10!
439
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag));
10!
440
    STagVal* pTagVal = taosArrayGet(pTagVals, 0);
10✔
441
    RAW_NULL_CHECK(pTagVal);
10!
442
    char* ptname = taosArrayGet(tagName, 0);
10✔
443
    RAW_NULL_CHECK(ptname);
10!
444
    cJSON* tname = cJSON_CreateString(ptname);
10✔
445
    RAW_NULL_CHECK(tname);
10!
446
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname));
10!
447
    cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON);
10✔
448
    RAW_NULL_CHECK(ttype);
10!
449
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype));
10!
450
    cJSON* tvalue = cJSON_CreateString(pJson);
10✔
451
    RAW_NULL_CHECK(tvalue);
10!
452
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "value", tvalue));
10!
453
    goto end;
10✔
454
  }
455

456
  for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
465✔
457
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
329✔
458
    RAW_NULL_CHECK(pTagVal);
329!
459
    cJSON* tag = cJSON_CreateObject();
329✔
460
    RAW_NULL_CHECK(tag);
329!
461
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag));
329!
462
    char* ptname = taosArrayGet(tagName, i);
329✔
463
    RAW_NULL_CHECK(ptname);
329!
464
    cJSON* tname = cJSON_CreateString(ptname);
329✔
465
    RAW_NULL_CHECK(tname);
329!
466
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname));
329!
467
    cJSON* ttype = cJSON_CreateNumber(pTagVal->type);
329✔
468
    RAW_NULL_CHECK(ttype);
329!
469
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype));
329!
470

471
    cJSON* tvalue = NULL;
329✔
472
    if (IS_VAR_DATA_TYPE(pTagVal->type)) {
431!
473
      int64_t bufSize = 0;
102✔
474
      if (pTagVal->type == TSDB_DATA_TYPE_VARBINARY) {
102!
475
        bufSize = pTagVal->nData * 2 + 2 + 3;
×
476
      } else {
477
        bufSize = pTagVal->nData + 3;
102✔
478
      }
479
      char* buf = taosMemoryCalloc(bufSize, 1);
102✔
480
      RAW_NULL_CHECK(buf);
102!
481
      if (dataConverToStr(buf, bufSize, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL) != TSDB_CODE_SUCCESS) {
102!
482
        taosMemoryFree(buf);
×
483
        goto end;
×
484
      }
485

486
      tvalue = cJSON_CreateString(buf);
102✔
487
      taosMemoryFree(buf);
102✔
488
      RAW_NULL_CHECK(tvalue);
102!
489
    } else {
490
      double val = 0;
227✔
491
      GET_TYPED_DATA(val, double, pTagVal->type, &pTagVal->i64);
227!
492
      tvalue = cJSON_CreateNumber(val);
227✔
493
      RAW_NULL_CHECK(tvalue);
227!
494
    }
495

496
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "value", tvalue));
329!
497
  }
498

499
end:
136✔
500
  taosMemoryFree(pJson);
156✔
501
  taosArrayDestroy(pTagVals);
156✔
502
}
156✔
503

504
static void buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs, cJSON** pJson) {
123✔
505
  int32_t code = 0;
123✔
506
  char*   string = NULL;
123✔
507
  cJSON*  json = cJSON_CreateObject();
123✔
508
  RAW_NULL_CHECK(json);
123!
509
  cJSON* type = cJSON_CreateString("create");
123✔
510
  RAW_NULL_CHECK(type);
123!
511
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
123!
512

513
  cJSON* tableType = cJSON_CreateString("child");
123✔
514
  RAW_NULL_CHECK(tableType);
123!
515
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
123!
516

517
  buildChildElement(json, pCreateReq);
123✔
518
  cJSON* createList = cJSON_CreateArray();
123✔
519
  RAW_NULL_CHECK(createList);
123!
520
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "createList", createList));
123!
521

522
  for (int i = 0; nReqs > 1 && i < nReqs; i++) {
156✔
523
    cJSON* create = cJSON_CreateObject();
33✔
524
    RAW_NULL_CHECK(create);
33!
525
    buildChildElement(create, pCreateReq + i);
33✔
526
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(createList, create));
33!
527
  }
528

529
end:
123✔
530
  *pJson = json;
123✔
531
}
123✔
532

533
static void processCreateTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
131✔
534
  SDecoder           decoder = {0};
131✔
535
  SVCreateTbBatchReq req = {0};
131✔
536
  SVCreateTbReq*     pCreateReq;
537
  // decode
538
  uDebug("create table data:%p", metaRsp);
131!
539
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
131✔
540
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
131✔
541
  tDecoderInit(&decoder, data, len);
131✔
542
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
131!
543
    goto end;
×
544
  }
545

546
  // loop to create table
547
  if (req.nReqs > 0) {
131!
548
    pCreateReq = req.pReqs;
131✔
549
    if (pCreateReq->type == TSDB_CHILD_TABLE) {
131✔
550
      buildCreateCTableJson(req.pReqs, req.nReqs, pJson);
113✔
551
    } else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
18!
552
      buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE,
18✔
553
                           &pCreateReq->colCmpr, pJson);
554
    }
555
  }
556

557
end:
×
558
  uDebug("create table return");
131!
559
  tDeleteSVCreateTbBatchReq(&req);
131✔
560
  tDecoderClear(&decoder);
131✔
561
}
131✔
562

563
static void processAutoCreateTable(SMqDataRsp* rsp, char** string) {
10✔
564
  SDecoder*      decoder = NULL;
10✔
565
  SVCreateTbReq* pCreateReq = NULL;
10✔
566
  int32_t        code = 0;
10✔
567
  uDebug("auto create table data:%p", rsp);
10!
568
  if (rsp->createTableNum <= 0) {
10!
569
    uError("processAutoCreateTable rsp->createTableNum <= 0");
×
570
    goto end;
×
571
  }
572

573
  decoder = taosMemoryCalloc(rsp->createTableNum, sizeof(SDecoder));
10✔
574
  RAW_NULL_CHECK(decoder);
10!
575
  pCreateReq = taosMemoryCalloc(rsp->createTableNum, sizeof(SVCreateTbReq));
10✔
576
  RAW_NULL_CHECK(pCreateReq);
10!
577

578
  // loop to create table
579
  for (int32_t iReq = 0; iReq < rsp->createTableNum; iReq++) {
34✔
580
    // decode
581
    void** data = taosArrayGet(rsp->createTableReq, iReq);
24✔
582
    RAW_NULL_CHECK(data);
24!
583
    int32_t* len = taosArrayGet(rsp->createTableLen, iReq);
24✔
584
    RAW_NULL_CHECK(len);
24!
585
    tDecoderInit(&decoder[iReq], *data, *len);
24✔
586
    if (tDecodeSVCreateTbReq(&decoder[iReq], pCreateReq + iReq) < 0) {
24!
587
      goto end;
×
588
    }
589

590
    if (pCreateReq[iReq].type != TSDB_CHILD_TABLE) {
24!
591
      uError("processAutoCreateTable pCreateReq[iReq].type != TSDB_CHILD_TABLE");
×
592
      goto end;
×
593
    }
594
  }
595
  cJSON* pJson = NULL;
10✔
596
  buildCreateCTableJson(pCreateReq, rsp->createTableNum, &pJson);
10✔
597
  *string = cJSON_PrintUnformatted(pJson);
10✔
598
  cJSON_Delete(pJson);
10✔
599

600
end:
10✔
601
  uDebug("auto created table return, sql json:%s", *string);
10!
602
  for (int i = 0; decoder && pCreateReq && i < rsp->createTableNum; i++) {
34!
603
    tDecoderClear(&decoder[i]);
24✔
604
    taosMemoryFreeClear(pCreateReq[i].comment);
24!
605
    if (pCreateReq[i].type == TSDB_CHILD_TABLE) {
24!
606
      taosArrayDestroy(pCreateReq[i].ctb.tagName);
24✔
607
    }
608
  }
609
  taosMemoryFree(decoder);
10✔
610
  taosMemoryFree(pCreateReq);
10✔
611
}
10✔
612

613
static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
24✔
614
  SDecoder     decoder = {0};
24✔
615
  SVAlterTbReq vAlterTbReq = {0};
24✔
616
  char*        string = NULL;
24✔
617
  cJSON*       json = NULL;
24✔
618
  int32_t      code = 0;
24✔
619

620
  uDebug("alter table data:%p", metaRsp);
24!
621
  // decode
622
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
24✔
623
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
24✔
624
  tDecoderInit(&decoder, data, len);
24✔
625
  if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) {
24!
626
    uError("tDecodeSVAlterTbReq error");
×
627
    goto end;
×
628
  }
629

630
  json = cJSON_CreateObject();
24✔
631
  RAW_NULL_CHECK(json);
24!
632
  cJSON* type = cJSON_CreateString("alter");
24✔
633
  RAW_NULL_CHECK(type);
24!
634
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
24!
635
  cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ||
44✔
636
                                                vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL
20!
637
                                            ? "child"
638
                                            : "normal");
639
  RAW_NULL_CHECK(tableType);
24!
640
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
24!
641
  cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
24✔
642
  RAW_NULL_CHECK(tableName);
24!
643
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
24!
644
  cJSON* alterType = cJSON_CreateNumber(vAlterTbReq.action);
24✔
645
  RAW_NULL_CHECK(alterType);
24!
646
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "alterType", alterType));
24!
647

648
  switch (vAlterTbReq.action) {
24!
649
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
4✔
650
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
4✔
651
      RAW_NULL_CHECK(colName);
4!
652
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
4!
653
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
4✔
654
      RAW_NULL_CHECK(colType);
4!
655
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
4!
656

657
      if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY ||
4!
658
          vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) {
4!
659
        int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
×
660
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
661
        RAW_NULL_CHECK(cbytes);
×
662
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
663
      } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
4!
664
        int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
665
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
666
        RAW_NULL_CHECK(cbytes);
×
667
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
668
      }
669
      break;
4✔
670
    }
671
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: {
×
672
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
×
673
      RAW_NULL_CHECK(colName);
×
674
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
675
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
×
676
      RAW_NULL_CHECK(colType);
×
677
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
×
678

679
      if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY ||
×
680
          vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) {
×
681
        int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
×
682
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
683
        RAW_NULL_CHECK(cbytes);
×
684
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
685
      } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
×
686
        int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
687
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
688
        RAW_NULL_CHECK(cbytes);
×
689
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
690
      }
691
      RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
×
692
      break;
×
693
    }
694
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
4✔
695
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
4✔
696
      RAW_NULL_CHECK(colName);
4!
697
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
4!
698
      break;
4✔
699
    }
700
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
4✔
701
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
4✔
702
      RAW_NULL_CHECK(colName);
4!
703
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
4!
704
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.colModType);
4✔
705
      RAW_NULL_CHECK(colType);
4!
706
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
4!
707
      if (vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY || vAlterTbReq.colModType == TSDB_DATA_TYPE_VARBINARY ||
4!
708
          vAlterTbReq.colModType == TSDB_DATA_TYPE_GEOMETRY) {
4!
709
        int32_t length = vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE;
×
710
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
711
        RAW_NULL_CHECK(cbytes);
×
712
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
713
      } else if (vAlterTbReq.colModType == TSDB_DATA_TYPE_NCHAR) {
4!
714
        int32_t length = (vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
4✔
715
        cJSON*  cbytes = cJSON_CreateNumber(length);
4✔
716
        RAW_NULL_CHECK(cbytes);
4!
717
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
4!
718
      }
719
      break;
4✔
720
    }
721
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
4✔
722
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
4✔
723
      RAW_NULL_CHECK(colName);
4!
724
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
4!
725
      cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName);
4✔
726
      RAW_NULL_CHECK(colNewName);
4!
727
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colNewName", colNewName));
4!
728
      break;
4✔
729
    }
730
    case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: {
4✔
731
      cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName);
4✔
732
      RAW_NULL_CHECK(tagName);
4!
733
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", tagName));
4!
734

735
      bool isNull = vAlterTbReq.isNull;
4✔
736
      if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
4!
737
        STag* jsonTag = (STag*)vAlterTbReq.pTagVal;
×
738
        if (jsonTag->nTag == 0) isNull = true;
×
739
      }
740
      if (!isNull) {
4!
741
        char* buf = NULL;
4✔
742

743
        if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
4!
744
          if (!tTagIsJson(vAlterTbReq.pTagVal)) {
×
745
            uError("processAlterTable isJson false");
×
746
            goto end;
×
747
          }
748
          parseTagDatatoJson(vAlterTbReq.pTagVal, &buf, NULL);
×
749
          if (buf == NULL) {
×
750
            uError("parseTagDatatoJson failed, buf == NULL");
×
751
            goto end;
×
752
          }
753
        } else {
754
          int64_t bufSize = 0;
4✔
755
          if (vAlterTbReq.tagType == TSDB_DATA_TYPE_VARBINARY) {
4!
756
            bufSize = vAlterTbReq.nTagVal * 2 + 2 + 3;
×
757
          } else {
758
            bufSize = vAlterTbReq.nTagVal + 3;
4✔
759
          }
760
          buf = taosMemoryCalloc(bufSize, 1);
4✔
761
          RAW_NULL_CHECK(buf);
4!
762
          if (dataConverToStr(buf, bufSize, vAlterTbReq.tagType, vAlterTbReq.pTagVal, vAlterTbReq.nTagVal, NULL) !=
4!
763
              TSDB_CODE_SUCCESS) {
764
            taosMemoryFree(buf);
×
765
            goto end;
×
766
          }
767
        }
768

769
        cJSON* colValue = cJSON_CreateString(buf);
4✔
770
        taosMemoryFree(buf);
4✔
771
        RAW_NULL_CHECK(colValue);
4!
772
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colValue", colValue));
4!
773
      }
774

775
      cJSON* isNullCJson = cJSON_CreateBool(isNull);
4✔
776
      RAW_NULL_CHECK(isNullCJson);
4!
777
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colValueNull", isNullCJson));
4!
778
      break;
4✔
779
    }
780
    case TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL: {
×
781
      int32_t nTags = taosArrayGetSize(vAlterTbReq.pMultiTag);
×
782
      if (nTags <= 0) {
×
783
        uError("processAlterTable parse multi tags error");
×
784
        goto end;
×
785
      }
786

787
      cJSON* tags = cJSON_CreateArray();
×
788
      RAW_NULL_CHECK(tags);
×
789
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags));
×
790

791
      for (int32_t i = 0; i < nTags; i++) {
×
792
        cJSON* member = cJSON_CreateObject();
×
793
        RAW_NULL_CHECK(member);
×
794
        RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, member));
×
795

796
        SMultiTagUpateVal* pTagVal = taosArrayGet(vAlterTbReq.pMultiTag, i);
×
797
        cJSON*             tagName = cJSON_CreateString(pTagVal->tagName);
×
798
        RAW_NULL_CHECK(tagName);
×
799
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colName", tagName));
×
800

801
        if (pTagVal->tagType == TSDB_DATA_TYPE_JSON) {
×
802
          uError("processAlterTable isJson false");
×
803
          goto end;
×
804
        }
805
        bool isNull = pTagVal->isNull;
×
806
        if (!isNull) {
×
807
          int64_t bufSize = 0;
×
808
          if (pTagVal->tagType == TSDB_DATA_TYPE_VARBINARY) {
×
809
            bufSize = pTagVal->nTagVal * 2 + 2 + 3;
×
810
          } else {
811
            bufSize = pTagVal->nTagVal + 3;
×
812
          }
813
          char* buf = taosMemoryCalloc(bufSize, 1);
×
814
          RAW_NULL_CHECK(buf);
×
815
          if (dataConverToStr(buf, bufSize, pTagVal->tagType, pTagVal->pTagVal, pTagVal->nTagVal, NULL) !=
×
816
              TSDB_CODE_SUCCESS) {
817
            taosMemoryFree(buf);
×
818
            goto end;
×
819
          }
820
          cJSON* colValue = cJSON_CreateString(buf);
×
821
          taosMemoryFree(buf);
×
822
          RAW_NULL_CHECK(colValue);
×
823
          RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colValue", colValue));
×
824
        }
825
        cJSON* isNullCJson = cJSON_CreateBool(isNull);
×
826
        RAW_NULL_CHECK(isNullCJson);
×
827
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colValueNull", isNullCJson));
×
828
      }
829
      break;
×
830
    }
831

832
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
×
833
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
×
834
      RAW_NULL_CHECK(colName);
×
835
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
836
      RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
×
837
      break;
×
838
    }
839
    default:
4✔
840
      break;
4✔
841
  }
842

843
end:
24✔
844
  uDebug("alter table return");
24!
845
  if (vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL) {
24!
846
    taosArrayDestroy(vAlterTbReq.pMultiTag);
×
847
  }
848
  tDecoderClear(&decoder);
24✔
849
  *pJson = json;
24✔
850
}
24✔
851

852
static void processDropSTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
8✔
853
  SDecoder     decoder = {0};
8✔
854
  SVDropStbReq req = {0};
8✔
855
  cJSON*       json = NULL;
8✔
856
  int32_t      code = 0;
8✔
857

858
  uDebug("processDropSTable data:%p", metaRsp);
8!
859

860
  // decode
861
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
8✔
862
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
8✔
863
  tDecoderInit(&decoder, data, len);
8✔
864
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
8!
865
    uError("tDecodeSVDropStbReq failed");
×
866
    goto end;
×
867
  }
868

869
  json = cJSON_CreateObject();
8✔
870
  RAW_NULL_CHECK(json);
8!
871
  cJSON* type = cJSON_CreateString("drop");
8✔
872
  RAW_NULL_CHECK(type);
8!
873
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
8!
874
  cJSON* tableType = cJSON_CreateString("super");
8✔
875
  RAW_NULL_CHECK(tableType);
8!
876
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
8!
877
  cJSON* tableName = cJSON_CreateString(req.name);
8✔
878
  RAW_NULL_CHECK(tableName);
8!
879
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
8!
880

881
end:
8✔
882
  uDebug("processDropSTable return");
8!
883
  tDecoderClear(&decoder);
8✔
884
  *pJson = json;
8✔
885
}
8✔
886
static void processDeleteTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
3✔
887
  SDeleteRes req = {0};
3✔
888
  SDecoder   coder = {0};
3✔
889
  cJSON*     json = NULL;
3✔
890
  int32_t    code = 0;
3✔
891

892
  uDebug("processDeleteTable data:%p", metaRsp);
3!
893
  // decode and process req
894
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
3✔
895
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
3✔
896

897
  tDecoderInit(&coder, data, len);
3✔
898
  if (tDecodeDeleteRes(&coder, &req) < 0) {
3!
899
    uError("tDecodeDeleteRes failed");
×
900
    goto end;
×
901
  }
902

903
  //  getTbName(req.tableFName);
904
  char sql[256] = {0};
3✔
905
  (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
3✔
906
                 req.tsColName, req.skey, req.tsColName, req.ekey);
907

908
  json = cJSON_CreateObject();
3✔
909
  RAW_NULL_CHECK(json);
3!
910
  cJSON* type = cJSON_CreateString("delete");
3✔
911
  RAW_NULL_CHECK(type);
3!
912
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
3!
913
  cJSON* sqlJson = cJSON_CreateString(sql);
3✔
914
  RAW_NULL_CHECK(sqlJson);
3!
915
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "sql", sqlJson));
3!
916

917
end:
3✔
918
  uDebug("processDeleteTable return");
3!
919
  tDecoderClear(&coder);
3✔
920
  *pJson = json;
3✔
921
}
3✔
922

923
static void processDropTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
5✔
924
  SDecoder         decoder = {0};
5✔
925
  SVDropTbBatchReq req = {0};
5✔
926
  cJSON*           json = NULL;
5✔
927
  int32_t          code = 0;
5✔
928

929
  uDebug("processDropTable data:%p", metaRsp);
5!
930
  // decode
931
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
5✔
932
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
5✔
933
  tDecoderInit(&decoder, data, len);
5✔
934
  if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) {
5!
935
    uError("tDecodeSVDropTbBatchReq failed");
×
936
    goto end;
×
937
  }
938

939
  json = cJSON_CreateObject();
5✔
940
  RAW_NULL_CHECK(json);
5!
941
  cJSON* type = cJSON_CreateString("drop");
5✔
942
  RAW_NULL_CHECK(type);
5!
943
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
5!
944
  cJSON* tableNameList = cJSON_CreateArray();
5✔
945
  RAW_NULL_CHECK(tableNameList);
5!
946
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableNameList", tableNameList));
5!
947

948
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
11✔
949
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
6✔
950
    cJSON*       tableName = cJSON_CreateString(pDropTbReq->name);
6✔
951
    RAW_NULL_CHECK(tableName);
6!
952
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tableNameList, tableName));
6!
953
  }
954

955
end:
5✔
956
  uDebug("processDropTable return");
5!
957
  tDecoderClear(&decoder);
5✔
958
  *pJson = json;
5✔
959
}
5✔
960

961
static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
118✔
962
  SVCreateStbReq req = {0};
118✔
963
  SDecoder       coder;
964
  SMCreateStbReq pReq = {0};
118✔
965
  int32_t        code = TSDB_CODE_SUCCESS;
118✔
966
  SRequestObj*   pRequest = NULL;
118✔
967

968
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
118!
969
  uDebug(LOG_ID_TAG " create stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
118!
970
  pRequest->syncQuery = true;
118✔
971
  if (!pRequest->pDb) {
118!
972
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
973
    goto end;
×
974
  }
975
  // decode and process req
976
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
118✔
977
  int32_t len = metaLen - sizeof(SMsgHead);
118✔
978
  tDecoderInit(&coder, data, len);
118✔
979
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
118!
980
    code = TSDB_CODE_INVALID_PARA;
×
981
    goto end;
×
982
  }
983

984
  int8_t           createDefaultCompress = 0;
118✔
985
  SColCmprWrapper* p = &req.colCmpr;
118✔
986
  if (p->nCols == 0) {
118!
987
    createDefaultCompress = 1;
×
988
  }
989
  // build create stable
990
  pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SFieldWithOptions));
118✔
991
  RAW_NULL_CHECK(pReq.pColumns);
118!
992
  for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
744✔
993
    SSchema*          pSchema = req.schemaRow.pSchema + i;
626✔
994
    SFieldWithOptions field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
626✔
995
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
626✔
996

997
    if (createDefaultCompress) {
626!
998
      field.compress = createDefaultColCmprByType(pSchema->type);
×
999
    } else {
1000
      SColCmpr* pCmp = &req.colCmpr.pColCmpr[i];
626✔
1001
      field.compress = pCmp->alg;
626✔
1002
    }
1003
    RAW_NULL_CHECK(taosArrayPush(pReq.pColumns, &field));
1,252!
1004
  }
1005
  pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
118✔
1006
  RAW_NULL_CHECK(pReq.pTags);
118!
1007
  for (int32_t i = 0; i < req.schemaTag.nCols; i++) {
440✔
1008
    SSchema* pSchema = req.schemaTag.pSchema + i;
322✔
1009
    SField   field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
322✔
1010
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
322✔
1011
    RAW_NULL_CHECK(taosArrayPush(pReq.pTags, &field));
644!
1012
  }
1013

1014
  pReq.colVer = req.schemaRow.version;
118✔
1015
  pReq.tagVer = req.schemaTag.version;
118✔
1016
  pReq.numOfColumns = req.schemaRow.nCols;
118✔
1017
  pReq.numOfTags = req.schemaTag.nCols;
118✔
1018
  pReq.commentLen = -1;
118✔
1019
  pReq.suid = processSuid(req.suid, pRequest->pDb);
118✔
1020
  pReq.source = TD_REQ_FROM_TAOX;
118✔
1021
  pReq.igExists = true;
118✔
1022

1023
  uDebug(LOG_ID_TAG " create stable name:%s suid:%" PRId64 " processSuid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
118!
1024
         pReq.suid);
1025
  STscObj* pTscObj = pRequest->pTscObj;
118✔
1026
  SName    tableName = {0};
118✔
1027
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
118✔
1028
  RAW_RETURN_CHECK(tNameExtractFullName(&tableName, pReq.name));
118!
1029
  SCmdMsgInfo pCmdMsg = {0};
118✔
1030
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
118✔
1031
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
118✔
1032
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
118✔
1033
  if (pCmdMsg.msgLen <= 0) {
118!
1034
    code = TSDB_CODE_INVALID_PARA;
×
1035
    goto end;
×
1036
  }
1037
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
118✔
1038
  RAW_NULL_CHECK(pCmdMsg.pMsg);
118!
1039
  if (tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) <= 0) {
118!
1040
    code = TSDB_CODE_INVALID_PARA;
×
1041
    taosMemoryFree(pCmdMsg.pMsg);
×
1042
    goto end;
×
1043
  }
1044

1045
  SQuery pQuery = {0};
118✔
1046
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
118✔
1047
  pQuery.pCmdMsg = &pCmdMsg;
118✔
1048
  pQuery.msgType = pQuery.pCmdMsg->msgType;
118✔
1049
  pQuery.stableQuery = true;
118✔
1050

1051
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
118✔
1052

1053
  taosMemoryFree(pCmdMsg.pMsg);
118✔
1054

1055
  if (pRequest->code == TSDB_CODE_SUCCESS) {
118!
1056
    SCatalog* pCatalog = NULL;
118✔
1057
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
118!
1058
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
118!
1059
  }
1060

1061
  code = pRequest->code;
118✔
1062

1063
end:
118✔
1064
  uDebug(LOG_ID_TAG " create stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
118!
1065
  destroyRequest(pRequest);
118✔
1066
  tFreeSMCreateStbReq(&pReq);
118✔
1067
  tDecoderClear(&coder);
118✔
1068
  return code;
118✔
1069
}
1070

1071
static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
8✔
1072
  SVDropStbReq req = {0};
8✔
1073
  SDecoder     coder = {0};
8✔
1074
  SMDropStbReq pReq = {0};
8✔
1075
  int32_t      code = TSDB_CODE_SUCCESS;
8✔
1076
  SRequestObj* pRequest = NULL;
8✔
1077

1078
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
8!
1079
  uDebug(LOG_ID_TAG " drop stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
8!
1080
  pRequest->syncQuery = true;
8✔
1081
  if (!pRequest->pDb) {
8!
1082
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1083
    goto end;
×
1084
  }
1085
  // decode and process req
1086
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
8✔
1087
  int32_t len = metaLen - sizeof(SMsgHead);
8✔
1088
  tDecoderInit(&coder, data, len);
8✔
1089
  if (tDecodeSVDropStbReq(&coder, &req) < 0) {
8!
1090
    code = TSDB_CODE_INVALID_PARA;
×
1091
    goto end;
×
1092
  }
1093

1094
  SCatalog* pCatalog = NULL;
8✔
1095
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
8!
1096
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
8✔
1097
                           .requestId = pRequest->requestId,
8✔
1098
                           .requestObjRefId = pRequest->self,
8✔
1099
                           .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
8✔
1100
  SName            pName = {0};
8✔
1101
  toName(pRequest->pTscObj->acctId, pRequest->pDb, req.name, &pName);
8✔
1102
  STableMeta* pTableMeta = NULL;
8✔
1103
  code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
8✔
1104
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
8✔
1105
    code = TSDB_CODE_SUCCESS;
2✔
1106
    taosMemoryFreeClear(pTableMeta);
2!
1107
    goto end;
2✔
1108
  }
1109
  if (code != TSDB_CODE_SUCCESS) {
6!
1110
    goto end;
×
1111
  }
1112
  pReq.suid = pTableMeta->uid;
6✔
1113
  taosMemoryFreeClear(pTableMeta);
6!
1114

1115
  // build drop stable
1116
  pReq.igNotExists = true;
6✔
1117
  pReq.source = TD_REQ_FROM_TAOX;
6✔
1118
  //  pReq.suid = processSuid(req.suid, pRequest->pDb);
1119

1120
  uDebug(LOG_ID_TAG " drop stable name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
6!
1121
         pReq.suid);
1122
  STscObj* pTscObj = pRequest->pTscObj;
6✔
1123
  SName    tableName = {0};
6✔
1124
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
6✔
1125
  if (tNameExtractFullName(&tableName, pReq.name) != 0) {
6!
1126
    code = TSDB_CODE_INVALID_PARA;
×
1127
    goto end;
×
1128
  }
1129

1130
  SCmdMsgInfo pCmdMsg = {0};
6✔
1131
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
6✔
1132
  pCmdMsg.msgType = TDMT_MND_DROP_STB;
6✔
1133
  pCmdMsg.msgLen = tSerializeSMDropStbReq(NULL, 0, &pReq);
6✔
1134
  if (pCmdMsg.msgLen <= 0) {
6!
1135
    code = TSDB_CODE_INVALID_PARA;
×
1136
    goto end;
×
1137
  }
1138
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
6✔
1139
  RAW_NULL_CHECK(pCmdMsg.pMsg);
6!
1140
  if (tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) <= 0) {
6!
1141
    code = TSDB_CODE_INVALID_PARA;
×
1142
    taosMemoryFree(pCmdMsg.pMsg);
×
1143
    goto end;
×
1144
  }
1145

1146
  SQuery pQuery = {0};
6✔
1147
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
6✔
1148
  pQuery.pCmdMsg = &pCmdMsg;
6✔
1149
  pQuery.msgType = pQuery.pCmdMsg->msgType;
6✔
1150
  pQuery.stableQuery = true;
6✔
1151

1152
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
6✔
1153
  taosMemoryFree(pCmdMsg.pMsg);
6✔
1154
  if (pRequest->code == TSDB_CODE_SUCCESS) {
6!
1155
    // ignore the error code
1156
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
6!
1157
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
6!
1158
  }
1159

1160
  code = pRequest->code;
6✔
1161

1162
end:
8✔
1163
  uDebug(LOG_ID_TAG " drop stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
8!
1164
  destroyRequest(pRequest);
8✔
1165
  tDecoderClear(&coder);
8✔
1166
  return code;
8✔
1167
}
1168

1169
typedef struct SVgroupCreateTableBatch {
1170
  SVCreateTbBatchReq req;
1171
  SVgroupInfo        info;
1172
  char               dbName[TSDB_DB_NAME_LEN];
1173
} SVgroupCreateTableBatch;
1174

1175
static void destroyCreateTbReqBatch(void* data) {
133✔
1176
  SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data;
133✔
1177
  taosArrayDestroy(pTbBatch->req.pArray);
133✔
1178
}
133✔
1179

1180
static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
131✔
1181
  SVCreateTbBatchReq req = {0};
131✔
1182
  SDecoder           coder = {0};
131✔
1183
  int32_t            code = TSDB_CODE_SUCCESS;
131✔
1184
  SRequestObj*       pRequest = NULL;
131✔
1185
  SQuery*            pQuery = NULL;
131✔
1186
  SHashObj*          pVgroupHashmap = NULL;
131✔
1187
  SArray*            pTagList = taosArrayInit(0, POINTER_BYTES);
131✔
1188
  RAW_NULL_CHECK(pTagList);
131!
1189
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
131!
1190
  uDebug(LOG_ID_TAG " create table, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
131!
1191

1192
  pRequest->syncQuery = true;
131✔
1193
  if (!pRequest->pDb) {
131!
1194
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1195
    goto end;
×
1196
  }
1197
  // decode and process req
1198
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
131✔
1199
  int32_t len = metaLen - sizeof(SMsgHead);
131✔
1200
  tDecoderInit(&coder, data, len);
131✔
1201
  if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
131!
1202
    code = TSDB_CODE_INVALID_PARA;
×
1203
    goto end;
×
1204
  }
1205

1206
  STscObj* pTscObj = pRequest->pTscObj;
131✔
1207

1208
  SVCreateTbReq* pCreateReq = NULL;
131✔
1209
  SCatalog*      pCatalog = NULL;
131✔
1210
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
131!
1211
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
131✔
1212
  RAW_NULL_CHECK(pVgroupHashmap);
131!
1213
  taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
131✔
1214

1215
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
131✔
1216
                           .requestId = pRequest->requestId,
131✔
1217
                           .requestObjRefId = pRequest->self,
131✔
1218
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
131✔
1219

1220
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
131✔
1221
  RAW_NULL_CHECK(pRequest->tableList);
131!
1222
  // loop to create table
1223
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
272✔
1224
    pCreateReq = req.pReqs + iReq;
141✔
1225

1226
    SVgroupInfo pInfo = {0};
141✔
1227
    SName       pName = {0};
141✔
1228
    toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
141✔
1229
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
141✔
1230
    if (code != TSDB_CODE_SUCCESS) {
141!
1231
      goto end;
×
1232
    }
1233

1234
    pCreateReq->flags |= TD_CREATE_IF_NOT_EXISTS;
141✔
1235
    // change tag cid to new cid
1236
    if (pCreateReq->type == TSDB_CHILD_TABLE) {
141✔
1237
      STableMeta* pTableMeta = NULL;
123✔
1238
      SName       sName = {0};
123✔
1239
      tb_uid_t    oldSuid = pCreateReq->ctb.suid;
123✔
1240
      //      pCreateReq->ctb.suid = processSuid(pCreateReq->ctb.suid, pRequest->pDb);
1241
      toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName);
123✔
1242
      code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta);
123✔
1243
      if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
123!
1244
        code = TSDB_CODE_SUCCESS;
×
1245
        taosMemoryFreeClear(pTableMeta);
×
1246
        continue;
×
1247
      }
1248

1249
      if (code != TSDB_CODE_SUCCESS) {
123!
1250
        goto end;
×
1251
      }
1252
      pCreateReq->ctb.suid = pTableMeta->uid;
123✔
1253

1254
      SArray* pTagVals = NULL;
123✔
1255
      code = tTagToValArray((STag*)pCreateReq->ctb.pTag, &pTagVals);
123✔
1256
      if (code != TSDB_CODE_SUCCESS) {
123!
1257
        taosMemoryFreeClear(pTableMeta);
×
1258
        goto end;
×
1259
      }
1260

1261
      bool rebuildTag = false;
123✔
1262
      for (int32_t i = 0; i < taosArrayGetSize(pCreateReq->ctb.tagName); i++) {
377✔
1263
        char* tName = taosArrayGet(pCreateReq->ctb.tagName, i);
254✔
1264
        if (tName == NULL) {
254!
1265
          continue;
×
1266
        }
1267
        for (int32_t j = pTableMeta->tableInfo.numOfColumns;
254✔
1268
             j < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; j++) {
1,002✔
1269
          SSchema* tag = &pTableMeta->schema[j];
748✔
1270
          if (strcmp(tag->name, tName) == 0 && tag->type != TSDB_DATA_TYPE_JSON) {
748✔
1271
            STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
240✔
1272
            if (pTagVal) {
240!
1273
              if (pTagVal->cid != tag->colId) {
240✔
1274
                pTagVal->cid = tag->colId;
21✔
1275
                rebuildTag = true;
21✔
1276
              }
1277
            } else {
1278
              uError("create tb invalid data %s, size:%d index:%d cid:%d", pCreateReq->name,
×
1279
                     (int)taosArrayGetSize(pTagVals), i, tag->colId);
1280
            }
1281
          }
1282
        }
1283
      }
1284
      taosMemoryFreeClear(pTableMeta);
123!
1285
      if (rebuildTag) {
123✔
1286
        STag* ppTag = NULL;
13✔
1287
        code = tTagNew(pTagVals, 1, false, &ppTag);
13✔
1288
        taosArrayDestroy(pTagVals);
13✔
1289
        pTagVals = NULL;
13✔
1290
        if (code != TSDB_CODE_SUCCESS) {
13!
1291
          goto end;
×
1292
        }
1293
        if (NULL == taosArrayPush(pTagList, &ppTag)) {
13!
1294
          tTagFree(ppTag);
×
1295
          goto end;
×
1296
        }
1297
        pCreateReq->ctb.pTag = (uint8_t*)ppTag;
13✔
1298
      }
1299
      taosArrayDestroy(pTagVals);
123✔
1300
    }
1301
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
282!
1302

1303
    SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
141✔
1304
    if (pTableBatch == NULL) {
141✔
1305
      SVgroupCreateTableBatch tBatch = {0};
133✔
1306
      tBatch.info = pInfo;
133✔
1307
      tstrncpy(tBatch.dbName, pRequest->pDb, TSDB_DB_NAME_LEN);
133✔
1308

1309
      tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
133✔
1310
      RAW_NULL_CHECK(tBatch.req.pArray);
133!
1311
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pCreateReq));
266!
1312
      tBatch.req.source = TD_REQ_FROM_TAOX;
133✔
1313
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
133!
1314
    } else {  // add to the correct vgroup
1315
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pCreateReq));
16!
1316
    }
1317
  }
1318

1319
  if (taosHashGetSize(pVgroupHashmap) == 0) {
131!
1320
    goto end;
×
1321
  }
1322
  SArray* pBufArray = NULL;
131✔
1323
  RAW_RETURN_CHECK(serializeVgroupsCreateTableBatch(pVgroupHashmap, &pBufArray));
131!
1324
  pQuery = NULL;
131✔
1325
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
131✔
1326
  if (TSDB_CODE_SUCCESS != code) goto end;
131!
1327
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
131✔
1328
  pQuery->msgType = TDMT_VND_CREATE_TABLE;
131✔
1329
  pQuery->stableQuery = false;
131✔
1330
  code = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT, &pQuery->pRoot);
131✔
1331
  if (TSDB_CODE_SUCCESS != code) goto end;
131!
1332
  RAW_NULL_CHECK(pQuery->pRoot);
131!
1333

1334
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
131!
1335

1336
  launchQueryImpl(pRequest, pQuery, true, NULL);
131✔
1337
  if (pRequest->code == TSDB_CODE_SUCCESS) {
131!
1338
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
131!
1339
  }
1340

1341
  code = pRequest->code;
131✔
1342

1343
end:
131✔
1344
  uDebug(LOG_ID_TAG " create table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
131!
1345
  tDeleteSVCreateTbBatchReq(&req);
131✔
1346

1347
  taosHashCleanup(pVgroupHashmap);
131✔
1348
  destroyRequest(pRequest);
131✔
1349
  tDecoderClear(&coder);
131✔
1350
  qDestroyQuery(pQuery);
131✔
1351
  taosArrayDestroyP(pTagList, taosMemoryFree);
131✔
1352
  return code;
131✔
1353
}
1354

1355
typedef struct SVgroupDropTableBatch {
1356
  SVDropTbBatchReq req;
1357
  SVgroupInfo      info;
1358
  char             dbName[TSDB_DB_NAME_LEN];
1359
} SVgroupDropTableBatch;
1360

1361
static void destroyDropTbReqBatch(void* data) {
3✔
1362
  SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
3✔
1363
  taosArrayDestroy(pTbBatch->req.pArray);
3✔
1364
}
3✔
1365

1366
static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
5✔
1367
  SVDropTbBatchReq req = {0};
5✔
1368
  SDecoder         coder = {0};
5✔
1369
  int32_t          code = TSDB_CODE_SUCCESS;
5✔
1370
  SRequestObj*     pRequest = NULL;
5✔
1371
  SQuery*          pQuery = NULL;
5✔
1372
  SHashObj*        pVgroupHashmap = NULL;
5✔
1373

1374
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
5!
1375
  uDebug(LOG_ID_TAG " drop table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
5!
1376

1377
  pRequest->syncQuery = true;
5✔
1378
  if (!pRequest->pDb) {
5!
1379
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1380
    goto end;
×
1381
  }
1382
  // decode and process req
1383
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
5✔
1384
  int32_t len = metaLen - sizeof(SMsgHead);
5✔
1385
  tDecoderInit(&coder, data, len);
5✔
1386
  if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
5!
1387
    code = TSDB_CODE_INVALID_PARA;
×
1388
    goto end;
×
1389
  }
1390

1391
  STscObj* pTscObj = pRequest->pTscObj;
5✔
1392

1393
  SVDropTbReq* pDropReq = NULL;
5✔
1394
  SCatalog*    pCatalog = NULL;
5✔
1395
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
5!
1396

1397
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
5✔
1398
  RAW_NULL_CHECK(pVgroupHashmap);
5!
1399
  taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
5✔
1400

1401
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
5✔
1402
                           .requestId = pRequest->requestId,
5✔
1403
                           .requestObjRefId = pRequest->self,
5✔
1404
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
5✔
1405
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
5✔
1406
  RAW_NULL_CHECK(pRequest->tableList);
5!
1407
  // loop to create table
1408
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
11✔
1409
    pDropReq = req.pReqs + iReq;
6✔
1410
    pDropReq->igNotExists = true;
6✔
1411
    //    pDropReq->suid = processSuid(pDropReq->suid, pRequest->pDb);
1412

1413
    SVgroupInfo pInfo = {0};
6✔
1414
    SName       pName = {0};
6✔
1415
    toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
6✔
1416
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
6!
1417

1418
    STableMeta* pTableMeta = NULL;
6✔
1419
    code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
6✔
1420
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
6✔
1421
      code = TSDB_CODE_SUCCESS;
2✔
1422
      taosMemoryFreeClear(pTableMeta);
2!
1423
      continue;
2✔
1424
    }
1425
    if (code != TSDB_CODE_SUCCESS) {
4!
1426
      goto end;
×
1427
    }
1428
    tb_uid_t oldSuid = pDropReq->suid;
4✔
1429
    pDropReq->suid = pTableMeta->suid;
4✔
1430
    taosMemoryFreeClear(pTableMeta);
4!
1431
    uDebug(LOG_ID_TAG " drop table name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, pDropReq->name, oldSuid,
4!
1432
           pDropReq->suid);
1433

1434
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
8!
1435
    SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
4✔
1436
    if (pTableBatch == NULL) {
4✔
1437
      SVgroupDropTableBatch tBatch = {0};
3✔
1438
      tBatch.info = pInfo;
3✔
1439
      tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
3✔
1440
      RAW_NULL_CHECK(tBatch.req.pArray);
3!
1441
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pDropReq));
6!
1442
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
3!
1443
    } else {  // add to the correct vgroup
1444
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pDropReq));
2!
1445
    }
1446
  }
1447

1448
  if (taosHashGetSize(pVgroupHashmap) == 0) {
5✔
1449
    goto end;
2✔
1450
  }
1451
  SArray* pBufArray = NULL;
3✔
1452
  RAW_RETURN_CHECK(serializeVgroupsDropTableBatch(pVgroupHashmap, &pBufArray));
3!
1453
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
3✔
1454
  if (TSDB_CODE_SUCCESS != code) goto end;
3!
1455
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
3✔
1456
  pQuery->msgType = TDMT_VND_DROP_TABLE;
3✔
1457
  pQuery->stableQuery = false;
3✔
1458
  pQuery->pRoot = NULL;
3✔
1459
  code = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT, &pQuery->pRoot);
3✔
1460
  if (TSDB_CODE_SUCCESS != code) goto end;
3!
1461
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
3!
1462

1463
  launchQueryImpl(pRequest, pQuery, true, NULL);
3✔
1464
  if (pRequest->code == TSDB_CODE_SUCCESS) {
3!
1465
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
3!
1466
  }
1467
  code = pRequest->code;
3✔
1468

1469
end:
5✔
1470
  uDebug(LOG_ID_TAG " drop table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
5!
1471
  taosHashCleanup(pVgroupHashmap);
5✔
1472
  destroyRequest(pRequest);
5✔
1473
  tDecoderClear(&coder);
5✔
1474
  qDestroyQuery(pQuery);
5✔
1475
  return code;
5✔
1476
}
1477

1478
static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
3✔
1479
  SDeleteRes req = {0};
3✔
1480
  SDecoder   coder = {0};
3✔
1481
  char       sql[256] = {0};
3✔
1482
  int32_t    code = TSDB_CODE_SUCCESS;
3✔
1483

1484
  uDebug("connId:0x%" PRIx64 " delete data, meta:%p, len:%d", *(int64_t*)taos, meta, metaLen);
3!
1485

1486
  // decode and process req
1487
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
3✔
1488
  int32_t len = metaLen - sizeof(SMsgHead);
3✔
1489
  tDecoderInit(&coder, data, len);
3✔
1490
  if (tDecodeDeleteRes(&coder, &req) < 0) {
3!
1491
    code = TSDB_CODE_INVALID_PARA;
×
1492
    goto end;
×
1493
  }
1494

1495
  (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
3✔
1496
                 req.tsColName, req.skey, req.tsColName, req.ekey);
1497

1498
  TAOS_RES* res = taosQueryImpl(taos, sql, false, TD_REQ_FROM_TAOX);
3✔
1499
  RAW_NULL_CHECK(res);
3!
1500
  SRequestObj* pRequest = (SRequestObj*)res;
3✔
1501
  code = pRequest->code;
3✔
1502
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_PAR_GET_META_ERROR) {
3!
1503
    code = TSDB_CODE_SUCCESS;
1✔
1504
  }
1505
  taos_free_result(res);
3✔
1506

1507
end:
3✔
1508
  uDebug("connId:0x%" PRIx64 " delete data sql:%s, code:%s", *(int64_t*)taos, sql, tstrerror(code));
3!
1509
  tDecoderClear(&coder);
3✔
1510
  return code;
3✔
1511
}
1512

1513
static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
24✔
1514
  SVAlterTbReq   req = {0};
24✔
1515
  SDecoder       dcoder = {0};
24✔
1516
  int32_t        code = TSDB_CODE_SUCCESS;
24✔
1517
  SRequestObj*   pRequest = NULL;
24✔
1518
  SQuery*        pQuery = NULL;
24✔
1519
  SArray*        pArray = NULL;
24✔
1520
  SVgDataBlocks* pVgData = NULL;
24✔
1521

1522
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
24!
1523
  uDebug(LOG_ID_TAG " alter table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
24!
1524
  pRequest->syncQuery = true;
24✔
1525
  if (!pRequest->pDb) {
24!
1526
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1527
    goto end;
×
1528
  }
1529
  // decode and process req
1530
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
24✔
1531
  int32_t len = metaLen - sizeof(SMsgHead);
24✔
1532
  tDecoderInit(&dcoder, data, len);
24✔
1533
  if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) {
24!
1534
    code = TSDB_CODE_INVALID_PARA;
×
1535
    goto end;
×
1536
  }
1537

1538
  // do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
1539
  if (req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS) {
24✔
1540
    goto end;
4✔
1541
  }
1542

1543
  STscObj*  pTscObj = pRequest->pTscObj;
20✔
1544
  SCatalog* pCatalog = NULL;
20✔
1545
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
20!
1546
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
20✔
1547
                           .requestId = pRequest->requestId,
20✔
1548
                           .requestObjRefId = pRequest->self,
20✔
1549
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
20✔
1550

1551
  SVgroupInfo pInfo = {0};
20✔
1552
  SName       pName = {0};
20✔
1553
  toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
20✔
1554
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
20!
1555
  pArray = taosArrayInit(1, sizeof(void*));
20✔
1556
  RAW_NULL_CHECK(pArray);
20!
1557

1558
  pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
20✔
1559
  RAW_NULL_CHECK(pVgData);
20!
1560
  pVgData->vg = pInfo;
20✔
1561

1562
  int tlen = 0;
20✔
1563
  req.source = TD_REQ_FROM_TAOX;
20✔
1564
  tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code);
20!
1565
  if (code != 0) {
20!
1566
    code = TSDB_CODE_OUT_OF_MEMORY;
×
1567
    goto end;
×
1568
  }
1569
  tlen += sizeof(SMsgHead);
20✔
1570
  void* pMsg = taosMemoryMalloc(tlen);
20✔
1571
  RAW_NULL_CHECK(pMsg);
20!
1572
  ((SMsgHead*)pMsg)->vgId = htonl(pInfo.vgId);
20✔
1573
  ((SMsgHead*)pMsg)->contLen = htonl(tlen);
20✔
1574
  void*    pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
20✔
1575
  SEncoder coder = {0};
20✔
1576
  tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
20✔
1577
  code = tEncodeSVAlterTbReq(&coder, &req);
20✔
1578
  if (code != 0) {
20!
1579
    tEncoderClear(&coder);
×
1580
    code = TSDB_CODE_OUT_OF_MEMORY;
×
1581
    goto end;
×
1582
  }
1583
  tEncoderClear(&coder);
20✔
1584

1585
  pVgData->pData = pMsg;
20✔
1586
  pVgData->size = tlen;
20✔
1587

1588
  pVgData->numOfTables = 1;
20✔
1589
  RAW_NULL_CHECK(taosArrayPush(pArray, &pVgData));
20!
1590

1591
  pQuery = NULL;
20✔
1592
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
20✔
1593
  if (NULL == pQuery) goto end;
20!
1594
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
20✔
1595
  pQuery->msgType = TDMT_VND_ALTER_TABLE;
20✔
1596
  pQuery->stableQuery = false;
20✔
1597
  pQuery->pRoot = NULL;
20✔
1598
  code = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT, &pQuery->pRoot);
20✔
1599
  if (TSDB_CODE_SUCCESS != code) goto end;
20!
1600
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pArray));
20!
1601

1602
  launchQueryImpl(pRequest, pQuery, true, NULL);
20✔
1603

1604
  pVgData = NULL;
20✔
1605
  pArray = NULL;
20✔
1606
  code = pRequest->code;
20✔
1607
  if (code == TSDB_CODE_TDB_TABLE_NOT_EXIST) {
20✔
1608
    code = TSDB_CODE_SUCCESS;
1✔
1609
  }
1610

1611
  if (pRequest->code == TSDB_CODE_SUCCESS) {
20✔
1612
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
19✔
1613
    if (pRes->res != NULL) {
19✔
1614
      code = handleAlterTbExecRes(pRes->res, pCatalog);
16✔
1615
    }
1616
  }
1617
end:
4✔
1618
  uDebug(LOG_ID_TAG " alter table return, meta:%p, len:%d, msg:%s", LOG_ID_VALUE, meta, metaLen, tstrerror(code));
24!
1619
  taosArrayDestroy(pArray);
24✔
1620
  if (pVgData) taosMemoryFreeClear(pVgData->pData);
24!
1621
  taosMemoryFreeClear(pVgData);
24!
1622
  destroyRequest(pRequest);
24✔
1623
  tDecoderClear(&dcoder);
24✔
1624
  qDestroyQuery(pQuery);
24✔
1625
  return code;
24✔
1626
}
1627

1628
int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD* fields,
1✔
1629
                                     int numFields) {
1630
  return taos_write_raw_block_with_fields_with_reqid(taos, rows, pData, tbname, fields, numFields, 0);
1✔
1631
}
1632

1633
int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname,
1✔
1634
                                                TAOS_FIELD* fields, int numFields, int64_t reqid) {
1635
  if (!taos || !pData || !tbname) {
1!
1636
    return TSDB_CODE_INVALID_PARA;
×
1637
  }
1638
  int32_t     code = TSDB_CODE_SUCCESS;
1✔
1639
  STableMeta* pTableMeta = NULL;
1✔
1640
  SQuery*     pQuery = NULL;
1✔
1641
  SHashObj*   pVgHash = NULL;
1✔
1642

1643
  SRequestObj* pRequest = NULL;
1✔
1644
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
1!
1645

1646
  uDebug(LOG_ID_TAG " write raw block with field, rows:%d, pData:%p, tbname:%s, fields:%p, numFields:%d", LOG_ID_VALUE,
1!
1647
         rows, pData, tbname, fields, numFields);
1648

1649
  pRequest->syncQuery = true;
1✔
1650
  if (!pRequest->pDb) {
1!
1651
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1652
    goto end;
×
1653
  }
1654

1655
  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
1✔
1656
  tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
1✔
1657
  tstrncpy(pName.tname, tbname, sizeof(pName.tname));
1✔
1658

1659
  struct SCatalog* pCatalog = NULL;
1✔
1660
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
1!
1661

1662
  SRequestConnInfo conn = {0};
1✔
1663
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
1✔
1664
  conn.requestId = pRequest->requestId;
1✔
1665
  conn.requestObjRefId = pRequest->self;
1✔
1666
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
1✔
1667

1668
  SVgroupInfo vgData = {0};
1✔
1669
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData));
1!
1670
  RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
1!
1671
  RAW_RETURN_CHECK(smlInitHandle(&pQuery));
1!
1672
  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
1✔
1673
  RAW_NULL_CHECK(pVgHash);
1!
1674
  RAW_RETURN_CHECK(
1!
1675
      taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
1676
  RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0, false));
1!
1677
  RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
1!
1678

1679
  launchQueryImpl(pRequest, pQuery, true, NULL);
1✔
1680
  code = pRequest->code;
1✔
1681

1682
end:
1✔
1683
  uDebug(LOG_ID_TAG " write raw block with field return, msg:%s", LOG_ID_VALUE, tstrerror(code));
1!
1684
  taosMemoryFreeClear(pTableMeta);
1!
1685
  qDestroyQuery(pQuery);
1✔
1686
  destroyRequest(pRequest);
1✔
1687
  taosHashCleanup(pVgHash);
1✔
1688
  return code;
1✔
1689
}
1690

1691
int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) {
7✔
1692
  return taos_write_raw_block_with_reqid(taos, rows, pData, tbname, 0);
7✔
1693
}
1694

1695
int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname, int64_t reqid) {
7✔
1696
  if (!taos || !pData || !tbname) {
7!
1697
    return TSDB_CODE_INVALID_PARA;
×
1698
  }
1699
  int32_t     code = TSDB_CODE_SUCCESS;
7✔
1700
  STableMeta* pTableMeta = NULL;
7✔
1701
  SQuery*     pQuery = NULL;
7✔
1702
  SHashObj*   pVgHash = NULL;
7✔
1703

1704
  SRequestObj* pRequest = NULL;
7✔
1705
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
7!
1706

1707
  uDebug(LOG_ID_TAG " write raw block, rows:%d, pData:%p, tbname:%s", LOG_ID_VALUE, rows, pData, tbname);
7!
1708

1709
  pRequest->syncQuery = true;
7✔
1710
  if (!pRequest->pDb) {
7!
1711
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1712
    goto end;
×
1713
  }
1714

1715
  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
7✔
1716
  tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
7✔
1717
  tstrncpy(pName.tname, tbname, sizeof(pName.tname));
7✔
1718

1719
  struct SCatalog* pCatalog = NULL;
7✔
1720
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
7!
1721

1722
  SRequestConnInfo conn = {0};
7✔
1723
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
7✔
1724
  conn.requestId = pRequest->requestId;
7✔
1725
  conn.requestObjRefId = pRequest->self;
7✔
1726
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
7✔
1727

1728
  SVgroupInfo vgData = {0};
7✔
1729
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData));
7!
1730
  RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
7✔
1731
  RAW_RETURN_CHECK(smlInitHandle(&pQuery));
6!
1732
  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
6✔
1733
  RAW_NULL_CHECK(pVgHash);
6!
1734
  RAW_RETURN_CHECK(
6!
1735
      taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
1736
  RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false, NULL, 0, false));
6✔
1737
  RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
4!
1738

1739
  launchQueryImpl(pRequest, pQuery, true, NULL);
4✔
1740
  code = pRequest->code;
4✔
1741

1742
end:
7✔
1743
  uDebug(LOG_ID_TAG " write raw block return, msg:%s", LOG_ID_VALUE, tstrerror(code));
7!
1744
  taosMemoryFreeClear(pTableMeta);
7✔
1745
  qDestroyQuery(pQuery);
7✔
1746
  destroyRequest(pRequest);
7✔
1747
  taosHashCleanup(pVgHash);
7✔
1748
  return code;
7✔
1749
}
1750

1751
static void* getRawDataFromRes(void* pRetrieve) {
128✔
1752
  void* rawData = NULL;
128✔
1753
  // deal with compatibility
1754
  if (*(int64_t*)pRetrieve == 0) {
128!
1755
    rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
×
1756
  } else if (*(int64_t*)pRetrieve == 1) {
128!
1757
    rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
128✔
1758
  }
1759
  return rawData;
128✔
1760
}
1761

1762
static int32_t buildCreateTbMap(SMqDataRsp* rsp, SHashObj* pHashObj) {
10✔
1763
  // find schema data info
1764
  int32_t       code = 0;
10✔
1765
  SVCreateTbReq pCreateReq = {0};
10✔
1766
  SDecoder      decoderTmp = {0};
10✔
1767

1768
  for (int j = 0; j < rsp->createTableNum; j++) {
34✔
1769
    void** dataTmp = taosArrayGet(rsp->createTableReq, j);
24✔
1770
    RAW_NULL_CHECK(dataTmp);
24!
1771
    int32_t* lenTmp = taosArrayGet(rsp->createTableLen, j);
24✔
1772
    RAW_NULL_CHECK(lenTmp);
24!
1773

1774
    tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
24✔
1775
    RAW_RETURN_CHECK(tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq));
24!
1776

1777
    if (pCreateReq.type != TSDB_CHILD_TABLE) {
24!
1778
      code = TSDB_CODE_INVALID_MSG;
×
1779
      goto end;
×
1780
    }
1781
    if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL) {
24✔
1782
      RAW_RETURN_CHECK(
22!
1783
          taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq)));
1784
    } else {
1785
      tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
2✔
1786
      pCreateReq = (SVCreateTbReq){0};
2✔
1787
    }
1788

1789
    tDecoderClear(&decoderTmp);
24✔
1790
  }
1791
  return 0;
10✔
1792

1793
end:
×
1794
  tDecoderClear(&decoderTmp);
×
1795
  tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
×
1796
  return code;
×
1797
}
1798

1799
typedef enum {
1800
  WRITE_RAW_INIT_START = 0,
1801
  WRITE_RAW_INIT_OK,
1802
  WRITE_RAW_INIT_FAIL,
1803
} WRITE_RAW_INIT_STATUS;
1804

1805
static SHashObj* writeRawCache = NULL;
1806
static int8_t    initFlag = 0;
1807
static int8_t    initedFlag = WRITE_RAW_INIT_START;
1808

1809
typedef struct {
1810
  SHashObj* pVgHash;
1811
  SHashObj* pNameHash;
1812
  SHashObj* pMetaHash;
1813
} rawCacheInfo;
1814

1815
typedef struct {
1816
  SVgroupInfo vgInfo;
1817
  int64_t     uid;
1818
  int64_t     suid;
1819
} tbInfo;
1820

1821
static void tmqFreeMeta(void* data) {
41✔
1822
  STableMeta* pTableMeta = *(STableMeta**)data;
41✔
1823
  taosMemoryFree(pTableMeta);
41✔
1824
}
41✔
1825

1826
static void freeRawCache(void* data) {
×
1827
  rawCacheInfo* pRawCache = (rawCacheInfo*)data;
×
1828
  taosHashCleanup(pRawCache->pMetaHash);
×
1829
  taosHashCleanup(pRawCache->pNameHash);
×
1830
  taosHashCleanup(pRawCache->pVgHash);
×
1831
}
×
1832

1833
static int32_t initRawCacheHash() {
15✔
1834
  if (writeRawCache == NULL) {
15!
1835
    writeRawCache = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
15✔
1836
    if (writeRawCache == NULL) {
15!
1837
      return terrno;
×
1838
    }
1839
    taosHashSetFreeFp(writeRawCache, freeRawCache);
15✔
1840
  }
1841
  return 0;
15✔
1842
}
1843

1844
static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW) {
7✔
1845
  char* p = (char*)rawData;
7✔
1846
  // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
1847
  // column length |
1848
  p += sizeof(int32_t);
7✔
1849
  p += sizeof(int32_t);
7✔
1850
  p += sizeof(int32_t);
7✔
1851
  p += sizeof(int32_t);
7✔
1852
  p += sizeof(int32_t);
7✔
1853
  p += sizeof(uint64_t);
7✔
1854
  int8_t* fields = p;
7✔
1855

1856
  if (pSW->nCols != pTableMeta->tableInfo.numOfColumns) {
7✔
1857
    return true;
3✔
1858
  }
1859
  for (int i = 0; i < pSW->nCols; i++) {
22✔
1860
    int j = 0;
18✔
1861
    for (; j < pTableMeta->tableInfo.numOfColumns; j++) {
50!
1862
      SSchema* pColSchema = &pTableMeta->schema[j];
50✔
1863
      char*    fieldName = pSW->pSchema[i].name;
50✔
1864

1865
      if (strcmp(pColSchema->name, fieldName) == 0) {
50✔
1866
        if (*fields != pColSchema->type || *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
18!
1867
          return true;
×
1868
        }
1869
        break;
18✔
1870
      }
1871
    }
1872
    fields += sizeof(int8_t) + sizeof(int32_t);
18✔
1873

1874
    if (j == pTableMeta->tableInfo.numOfColumns) return true;
18!
1875
  }
1876
  return false;
4✔
1877
}
1878

1879
static int32_t getRawCache(SHashObj** pVgHash, SHashObj** pNameHash, SHashObj** pMetaHash, void* key) {
47✔
1880
  int32_t code = 0;
47✔
1881
  void*   cacheInfo = taosHashGet(writeRawCache, &key, POINTER_BYTES);
47✔
1882
  if (cacheInfo == NULL) {
47!
1883
    *pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
47✔
1884
    RAW_NULL_CHECK(*pVgHash);
47!
1885
    *pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
47✔
1886
    RAW_NULL_CHECK(*pNameHash);
47!
1887
    *pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
47✔
1888
    RAW_NULL_CHECK(*pMetaHash);
47!
1889
    taosHashSetFreeFp(*pMetaHash, tmqFreeMeta);
47✔
1890
    rawCacheInfo info = {*pVgHash, *pNameHash, *pMetaHash};
47✔
1891
    RAW_RETURN_CHECK(taosHashPut(writeRawCache, &key, POINTER_BYTES, &info, sizeof(rawCacheInfo)));
47!
1892
  } else {
1893
    rawCacheInfo* info = (rawCacheInfo*)cacheInfo;
×
1894
    *pVgHash = info->pVgHash;
×
1895
    *pNameHash = info->pNameHash;
×
1896
    *pMetaHash = info->pMetaHash;
×
1897
  }
1898

1899
  return 0;
47✔
1900
end:
×
1901
  taosHashCleanup(*pMetaHash);
×
1902
  taosHashCleanup(*pNameHash);
×
1903
  taosHashCleanup(*pVgHash);
×
1904
  return code;
×
1905
}
1906

1907
static int32_t buildRawRequest(TAOS* taos, SRequestObj** pRequest, SCatalog** pCatalog, SRequestConnInfo* conn) {
47✔
1908
  int32_t code = 0;
47✔
1909
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, pRequest, 0));
47!
1910
  (*pRequest)->syncQuery = true;
47✔
1911
  if (!(*pRequest)->pDb) {
47!
1912
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1913
    goto end;
×
1914
  }
1915

1916
  RAW_RETURN_CHECK(catalogGetHandle((*pRequest)->pTscObj->pAppInfo->clusterId, pCatalog));
47!
1917
  conn->pTrans = (*pRequest)->pTscObj->pAppInfo->pTransporter;
47✔
1918
  conn->requestId = (*pRequest)->requestId;
47✔
1919
  conn->requestObjRefId = (*pRequest)->self;
47✔
1920
  conn->mgmtEps = getEpSet_s(&(*pRequest)->pTscObj->pAppInfo->mgmtEp);
47✔
1921

1922
end:
47✔
1923
  return code;
47✔
1924
}
1925

1926
typedef int32_t _raw_decode_func_(SDecoder* pDecoder, SMqDataRsp* pRsp);
1927
static int32_t  decodeRawData(SDecoder* decoder, void* data, int32_t dataLen, _raw_decode_func_ func,
47✔
1928
                              SMqRspObj* rspObj) {
1929
   int8_t dataVersion = *(int8_t*)data;
47✔
1930
   if (dataVersion >= MQ_DATA_RSP_VERSION) {
47!
1931
     data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
47✔
1932
     dataLen -= sizeof(int8_t) + sizeof(int32_t);
47✔
1933
  }
1934

1935
   rspObj->resIter = -1;
47✔
1936
   tDecoderInit(decoder, data, dataLen);
47✔
1937
   int32_t code = func(decoder, &rspObj->dataRsp);
47✔
1938
   if (code != 0) {
47!
1939
     SET_ERROR_MSG("decode mq taosx data rsp failed");
×
1940
  }
1941
   return code;
47✔
1942
}
1943

1944
static int32_t processCacheMeta(SHashObj* pVgHash, SHashObj* pNameHash, SHashObj* pMetaHash,
128✔
1945
                                SVCreateTbReq* pCreateReqDst, SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName,
1946
                                STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry) {
1947
  int32_t     code = 0;
128✔
1948
  STableMeta* pTableMeta = NULL;
128✔
1949
  tbInfo*     tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
128✔
1950
  if (tmpInfo == NULL || retry > 0) {
128!
1951
    tbInfo info = {0};
121✔
1952

1953
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, conn, pName, &info.vgInfo));
121!
1954
    if (pCreateReqDst && tmpInfo == NULL) {  // change stable name to get meta
121!
1955
      tstrncpy(pName->tname, pCreateReqDst->ctb.stbName, TSDB_TABLE_NAME_LEN);
24✔
1956
    }
1957
    RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
121!
1958
    info.uid = pTableMeta->uid;
121✔
1959
    if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
121✔
1960
      info.suid = pTableMeta->suid;
82✔
1961
    } else {
1962
      info.suid = pTableMeta->uid;
39✔
1963
    }
1964
    code = taosHashPut(pMetaHash, &info.suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
121✔
1965
    if (code != 0) {
121!
1966
      taosMemoryFree(pTableMeta);
×
1967
      goto end;
×
1968
    }
1969
    if (pCreateReqDst) {
121✔
1970
      pTableMeta->vgId = info.vgInfo.vgId;
24✔
1971
      pTableMeta->uid = pCreateReqDst->uid;
24✔
1972
      pCreateReqDst->ctb.suid = pTableMeta->suid;
24✔
1973
    }
1974

1975
    RAW_RETURN_CHECK(taosHashPut(pNameHash, pName->tname, strlen(pName->tname), &info, sizeof(tbInfo)));
121!
1976
    tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
121✔
1977
    RAW_RETURN_CHECK(
121!
1978
        taosHashPut(pVgHash, &info.vgInfo.vgId, sizeof(info.vgInfo.vgId), &info.vgInfo, sizeof(SVgroupInfo)));
1979
  }
1980

1981
  if (pTableMeta == NULL || retry > 0) {
128!
1982
    STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, &tmpInfo->suid, LONG_BYTES);
7✔
1983
    if (pTableMetaTmp == NULL || retry > 0 || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) {
7!
1984
      RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
3!
1985
      code = taosHashPut(pMetaHash, &tmpInfo->suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
3✔
1986
      if (code != 0) {
3!
1987
        taosMemoryFree(pTableMeta);
×
1988
        goto end;
×
1989
      }
1990

1991
    } else {
1992
      pTableMeta = *pTableMetaTmp;
4✔
1993
      pTableMeta->uid = tmpInfo->uid;
4✔
1994
      pTableMeta->vgId = tmpInfo->vgInfo.vgId;
4✔
1995
    }
1996
  }
1997
  *pMeta = pTableMeta;
128✔
1998

1999
end:
128✔
2000
  return code;
128✔
2001
}
2002

2003
static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
37✔
2004
  int32_t   code = TSDB_CODE_SUCCESS;
37✔
2005
  SQuery*   pQuery = NULL;
37✔
2006
  SMqRspObj rspObj = {0};
37✔
2007
  SDecoder  decoder = {0};
37✔
2008

2009
  SRequestObj*     pRequest = NULL;
37✔
2010
  SCatalog*        pCatalog = NULL;
37✔
2011
  SRequestConnInfo conn = {0};
37✔
2012
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
37!
2013
  uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
37!
2014
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
37!
2015

2016
  SHashObj* pVgHash = NULL;
37✔
2017
  SHashObj* pNameHash = NULL;
37✔
2018
  SHashObj* pMetaHash = NULL;
37✔
2019
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
37!
2020
  int retry = 0;
37✔
2021
  while (1) {
2022
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
37!
2023
    uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
37!
2024
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
138✔
2025
      if (!rspObj.dataRsp.withSchema) {
101!
2026
        goto end;
×
2027
      }
2028

2029
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
101✔
2030
      RAW_NULL_CHECK(tbName);
101!
2031
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
101✔
2032
      RAW_NULL_CHECK(pSW);
101!
2033
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
101✔
2034
      RAW_NULL_CHECK(pRetrieve);
101!
2035
      void* rawData = getRawDataFromRes(pRetrieve);
101✔
2036
      RAW_NULL_CHECK(rawData);
101!
2037

2038
      uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
101!
2039
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
101✔
2040
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
101✔
2041
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
101✔
2042

2043
      STableMeta* pTableMeta = NULL;
101✔
2044
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, &pTableMeta, pSW,
101!
2045
                                        rawData, retry));
2046
      char err[ERR_MSG_LEN] = {0};
101✔
2047
      code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
101✔
2048
      if (code != TSDB_CODE_SUCCESS) {
101!
2049
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
2050
        goto end;
×
2051
      }
2052
    }
2053
    RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
37!
2054
    launchQueryImpl(pRequest, pQuery, true, NULL);
37✔
2055
    code = pRequest->code;
37✔
2056

2057
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
37!
2058
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
2059
      qDestroyQuery(pQuery);
×
2060
      pQuery = NULL;
×
2061
      rspObj.resIter = -1;
×
2062
      continue;
×
2063
    }
2064
    break;
37✔
2065
  }
2066

2067
end:
37✔
2068
  uDebug(LOG_ID_TAG " write raw data return, msg:%s", LOG_ID_VALUE, tstrerror(code));
37!
2069
  tDeleteMqDataRsp(&rspObj.dataRsp);
37✔
2070
  tDecoderClear(&decoder);
37✔
2071
  qDestroyQuery(pQuery);
37✔
2072
  destroyRequest(pRequest);
37✔
2073
  return code;
37✔
2074
}
2075

2076
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) {
10✔
2077
  int32_t   code = TSDB_CODE_SUCCESS;
10✔
2078
  SQuery*   pQuery = NULL;
10✔
2079
  SMqRspObj rspObj = {0};
10✔
2080
  SDecoder  decoder = {0};
10✔
2081
  SHashObj* pCreateTbHash = NULL;
10✔
2082

2083
  SRequestObj*     pRequest = NULL;
10✔
2084
  SCatalog*        pCatalog = NULL;
10✔
2085
  SRequestConnInfo conn = {0};
10✔
2086

2087
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
10!
2088
  uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
10!
2089
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeSTaosxRsp, &rspObj));
10!
2090

2091
  pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
10✔
2092
  RAW_NULL_CHECK(pCreateTbHash);
10!
2093
  RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash));
10!
2094

2095
  SHashObj* pVgHash = NULL;
10✔
2096
  SHashObj* pNameHash = NULL;
10✔
2097
  SHashObj* pMetaHash = NULL;
10✔
2098
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
10!
2099
  int retry = 0;
10✔
2100
  while (1) {
2101
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
10!
2102
    uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
10!
2103
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
37✔
2104
      if (!rspObj.dataRsp.withSchema) {
27!
2105
        goto end;
×
2106
      }
2107

2108
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
27✔
2109
      RAW_NULL_CHECK(tbName);
27!
2110
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
27✔
2111
      RAW_NULL_CHECK(pSW);
27!
2112
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
27✔
2113
      RAW_NULL_CHECK(pRetrieve);
27!
2114
      void* rawData = getRawDataFromRes(pRetrieve);
27✔
2115
      RAW_NULL_CHECK(rawData);
27!
2116

2117
      uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
27!
2118
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
27✔
2119
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
27✔
2120
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
27✔
2121

2122
      // find schema data info
2123
      SVCreateTbReq* pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, pName.tname, strlen(pName.tname));
27✔
2124
      STableMeta*    pTableMeta = NULL;
27✔
2125
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, pCreateReqDst, pCatalog, &conn, &pName,
27!
2126
                                        &pTableMeta, pSW, rawData, retry));
2127
      char err[ERR_MSG_LEN] = {0};
27✔
2128
      code =
2129
          rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
27✔
2130
      if (code != TSDB_CODE_SUCCESS) {
27!
2131
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
2132
        goto end;
×
2133
      }
2134
    }
2135
    RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
10!
2136
    launchQueryImpl(pRequest, pQuery, true, NULL);
10✔
2137
    code = pRequest->code;
10✔
2138

2139
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
10!
2140
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
2141
      qDestroyQuery(pQuery);
×
2142
      pQuery = NULL;
×
2143
      rspObj.resIter = -1;
×
2144
      continue;
×
2145
    }
2146
    break;
10✔
2147
  }
2148

2149
end:
10✔
2150
  uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
10!
2151
  tDeleteSTaosxRsp(&rspObj.dataRsp);
10✔
2152
  void* pIter = taosHashIterate(pCreateTbHash, NULL);
10✔
2153
  while (pIter) {
32✔
2154
    tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE);
22✔
2155
    pIter = taosHashIterate(pCreateTbHash, pIter);
22✔
2156
  }
2157
  taosHashCleanup(pCreateTbHash);
10✔
2158
  tDecoderClear(&decoder);
10✔
2159
  qDestroyQuery(pQuery);
10✔
2160
  destroyRequest(pRequest);
10✔
2161
  return code;
10✔
2162
}
2163

2164
static void processSimpleMeta(SMqMetaRsp* pMetaRsp, cJSON** meta) {
289✔
2165
  if (pMetaRsp->resMsgType == TDMT_VND_CREATE_STB) {
289✔
2166
    processCreateStb(pMetaRsp, meta);
78✔
2167
  } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_STB) {
211✔
2168
    processAlterStb(pMetaRsp, meta);
40✔
2169
  } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_STB) {
171✔
2170
    processDropSTable(pMetaRsp, meta);
8✔
2171
  } else if (pMetaRsp->resMsgType == TDMT_VND_CREATE_TABLE) {
163✔
2172
    processCreateTable(pMetaRsp, meta);
131✔
2173
  } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_TABLE) {
32✔
2174
    processAlterTable(pMetaRsp, meta);
24✔
2175
  } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_TABLE) {
8✔
2176
    processDropTable(pMetaRsp, meta);
5✔
2177
  } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_TABLE) {
3!
2178
    processDropTable(pMetaRsp, meta);
×
2179
  } else if (pMetaRsp->resMsgType == TDMT_VND_DELETE) {
3!
2180
    processDeleteTable(pMetaRsp, meta);
3✔
2181
  }
2182
}
289✔
2183

2184
static void processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp, char** string) {
14✔
2185
  SDecoder        coder;
2186
  SMqBatchMetaRsp rsp = {0};
14✔
2187
  int32_t         code = 0;
14✔
2188
  cJSON*          pJson = NULL;
14✔
2189
  tDecoderInit(&coder, pMsgRsp->pMetaBuff, pMsgRsp->metaBuffLen);
14✔
2190
  if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) {
14!
2191
    goto end;
×
2192
  }
2193

2194
  pJson = cJSON_CreateObject();
14✔
2195
  RAW_NULL_CHECK(pJson);
14!
2196
  RAW_FALSE_CHECK(cJSON_AddStringToObject(pJson, "tmq_meta_version", TMQ_META_VERSION));
14!
2197
  cJSON* pMetaArr = cJSON_CreateArray();
14✔
2198
  RAW_NULL_CHECK(pMetaArr);
14!
2199
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(pJson, "metas", pMetaArr));
14!
2200

2201
  int32_t num = taosArrayGetSize(rsp.batchMetaReq);
14✔
2202
  for (int32_t i = 0; i < num; i++) {
143✔
2203
    int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
129✔
2204
    RAW_NULL_CHECK(len);
129!
2205
    void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
129✔
2206
    RAW_NULL_CHECK(tmpBuf);
129!
2207
    SDecoder   metaCoder = {0};
129✔
2208
    SMqMetaRsp metaRsp = {0};
129✔
2209
    tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead));
129✔
2210
    if (tDecodeMqMetaRsp(&metaCoder, &metaRsp) < 0) {
129!
2211
      goto end;
×
2212
    }
2213
    cJSON* pItem = NULL;
129✔
2214
    processSimpleMeta(&metaRsp, &pItem);
129✔
2215
    tDeleteMqMetaRsp(&metaRsp);
129✔
2216
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(pMetaArr, pItem));
129!
2217
  }
2218

2219
  tDeleteMqBatchMetaRsp(&rsp);
14✔
2220
  char* fullStr = cJSON_PrintUnformatted(pJson);
14✔
2221
  cJSON_Delete(pJson);
14✔
2222
  *string = fullStr;
14✔
2223
  return;
14✔
2224

2225
end:
×
2226
  cJSON_Delete(pJson);
×
2227
  tDeleteMqBatchMetaRsp(&rsp);
×
2228
}
2229

2230
char* tmq_get_json_meta(TAOS_RES* res) {
184✔
2231
  if (res == NULL) return NULL;
184!
2232
  uDebug("tmq_get_json_meta res:%p", res);
184!
2233
  if (!TD_RES_TMQ_META(res) && !TD_RES_TMQ_METADATA(res) && !TD_RES_TMQ_BATCH_META(res)) {
184!
2234
    return NULL;
×
2235
  }
2236

2237
  char*      string = NULL;
184✔
2238
  SMqRspObj* rspObj = (SMqRspObj*)res;
184✔
2239
  if (TD_RES_TMQ_METADATA(res)) {
184✔
2240
    processAutoCreateTable(&rspObj->dataRsp, &string);
10✔
2241
  } else if (TD_RES_TMQ_BATCH_META(res)) {
174✔
2242
    processBatchMetaToJson(&rspObj->batchMetaRsp, &string);
14✔
2243
  } else if (TD_RES_TMQ_META(res)) {
160!
2244
    cJSON* pJson = NULL;
160✔
2245
    processSimpleMeta(&rspObj->metaRsp, &pJson);
160✔
2246
    string = cJSON_PrintUnformatted(pJson);
160✔
2247
    cJSON_Delete(pJson);
160✔
2248
  } else {
2249
    uError("tmq_get_json_meta res:%d, invalid type", *(int8_t*)res);
×
2250
  }
2251

2252
  uDebug("tmq_get_json_meta string:%s", string);
184!
2253
  return string;
184✔
2254
}
2255

2256
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
228!
2257

2258
static int32_t getOffSetLen(const SMqDataRsp* pRsp) {
47✔
2259
  SEncoder coder = {0};
47✔
2260
  tEncoderInit(&coder, NULL, 0);
47✔
2261
  if (tEncodeSTqOffsetVal(&coder, &pRsp->reqOffset) < 0) return -1;
47!
2262
  if (tEncodeSTqOffsetVal(&coder, &pRsp->rspOffset) < 0) return -1;
47!
2263
  int32_t pos = coder.pos;
47✔
2264
  tEncoderClear(&coder);
47✔
2265
  return pos;
47✔
2266
}
2267

2268
typedef int32_t __encode_func__(SEncoder* pEncoder, const SMqDataRsp* pRsp);
2269
static int32_t  encodeMqDataRsp(__encode_func__* encodeFunc, SMqDataRsp* rspObj, tmq_raw_data* raw) {
47✔
2270
   int32_t  len = 0;
47✔
2271
   int32_t  code = 0;
47✔
2272
   SEncoder encoder = {0};
47✔
2273
   void*    buf = NULL;
47✔
2274
   tEncodeSize(encodeFunc, rspObj, len, code);
47!
2275
   if (code < 0) {
47!
2276
     code = TSDB_CODE_INVALID_MSG;
×
2277
     goto FAILED;
×
2278
  }
2279
   len += sizeof(int8_t) + sizeof(int32_t);
47✔
2280
   buf = taosMemoryCalloc(1, len);
47✔
2281
   if (buf == NULL) {
47!
2282
     code = terrno;
×
2283
     goto FAILED;
×
2284
  }
2285
   tEncoderInit(&encoder, buf, len);
47✔
2286
   if (tEncodeI8(&encoder, MQ_DATA_RSP_VERSION) < 0) {
47!
2287
     code = TSDB_CODE_INVALID_MSG;
×
2288
     goto FAILED;
×
2289
  }
2290
   int32_t offsetLen = getOffSetLen(rspObj);
47✔
2291
   if (offsetLen <= 0) {
47!
2292
     code = TSDB_CODE_INVALID_MSG;
×
2293
     goto FAILED;
×
2294
  }
2295
   if (tEncodeI32(&encoder, offsetLen) < 0) {
47!
2296
     code = TSDB_CODE_INVALID_MSG;
×
2297
     goto FAILED;
×
2298
  }
2299
   if (encodeFunc(&encoder, rspObj) < 0) {
47!
2300
     code = TSDB_CODE_INVALID_MSG;
×
2301
     goto FAILED;
×
2302
  }
2303
   tEncoderClear(&encoder);
47✔
2304

2305
   raw->raw = buf;
47✔
2306
   raw->raw_len = len;
47✔
2307
   return code;
47✔
2308
FAILED:
×
2309
  tEncoderClear(&encoder);
×
2310
  taosMemoryFree(buf);
×
2311
  return code;
×
2312
}
2313

2314
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
246✔
2315
  if (!raw || !res) {
246!
2316
    return TSDB_CODE_INVALID_PARA;
×
2317
  }
2318
  SMqRspObj* rspObj = ((SMqRspObj*)res);
246✔
2319
  if (TD_RES_TMQ_META(res)) {
246✔
2320
    raw->raw = rspObj->metaRsp.metaRsp;
185✔
2321
    raw->raw_len = rspObj->metaRsp.metaRspLen;
185✔
2322
    raw->raw_type = rspObj->metaRsp.resMsgType;
185✔
2323
    uDebug("tmq get raw type meta:%p", raw);
185!
2324
  } else if (TD_RES_TMQ(res)) {
61✔
2325
    int32_t code = encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->dataRsp, raw);
37✔
2326
    if (code != 0) {
37!
2327
      uError("tmq get raw type error:%d", terrno);
×
2328
      return code;
×
2329
    }
2330
    raw->raw_type = RES_TYPE__TMQ;
37✔
2331
    uDebug("tmq get raw type data:%p", raw);
37!
2332
  } else if (TD_RES_TMQ_METADATA(res)) {
24✔
2333
    int32_t code = encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->dataRsp, raw);
10✔
2334
    if (code != 0) {
10!
2335
      uError("tmq get raw type error:%d", terrno);
×
2336
      return code;
×
2337
    }
2338
    raw->raw_type = RES_TYPE__TMQ_METADATA;
10✔
2339
    uDebug("tmq get raw type metadata:%p", raw);
10!
2340
  } else if (TD_RES_TMQ_BATCH_META(res)) {
14!
2341
    raw->raw = rspObj->batchMetaRsp.pMetaBuff;
14✔
2342
    raw->raw_len = rspObj->batchMetaRsp.metaBuffLen;
14✔
2343
    raw->raw_type = rspObj->resType;
14✔
2344
    uDebug("tmq get raw batch meta:%p", raw);
14!
2345
  } else {
2346
    uError("tmq get raw error type:%d", *(int8_t*)res);
×
2347
    return TSDB_CODE_TMQ_INVALID_MSG;
×
2348
  }
2349
  return TSDB_CODE_SUCCESS;
246✔
2350
}
2351

2352
void tmq_free_raw(tmq_raw_data raw) {
246✔
2353
  uDebug("tmq free raw data type:%d", raw.raw_type);
246!
2354
  if (raw.raw_type == RES_TYPE__TMQ || raw.raw_type == RES_TYPE__TMQ_METADATA) {
246✔
2355
    taosMemoryFree(raw.raw);
47✔
2356
  }
2357
  (void)memset(terrMsg, 0, ERR_MSG_LEN);
246✔
2358
}
246✔
2359

2360
static int32_t writeRawInit() {
350✔
2361
  while (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_START) {
365✔
2362
    int8_t old = atomic_val_compare_exchange_8(&initFlag, 0, 1);
15✔
2363
    if (old == 0) {
15!
2364
      int32_t code = initRawCacheHash();
15✔
2365
      if (code != 0) {
15!
2366
        uError("tmq writeRawImpl init error:%d", code);
×
2367
        atomic_store_8(&initedFlag, WRITE_RAW_INIT_FAIL);
×
2368
        return code;
×
2369
      }
2370
      atomic_store_8(&initedFlag, WRITE_RAW_INIT_OK);
15✔
2371
    }
2372
  }
2373

2374
  if (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_FAIL) {
350!
2375
    return TSDB_CODE_INTERNAL_ERROR;
×
2376
  }
2377
  return 0;
350✔
2378
}
2379

2380
static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) {
350✔
2381
  if (writeRawInit() != 0) {
350!
2382
    return TSDB_CODE_INTERNAL_ERROR;
×
2383
  }
2384

2385
  if (type == TDMT_VND_CREATE_STB) {
350✔
2386
    return taosCreateStb(taos, buf, len);
78✔
2387
  } else if (type == TDMT_VND_ALTER_STB) {
272✔
2388
    return taosCreateStb(taos, buf, len);
40✔
2389
  } else if (type == TDMT_VND_DROP_STB) {
232✔
2390
    return taosDropStb(taos, buf, len);
8✔
2391
  } else if (type == TDMT_VND_CREATE_TABLE) {
224✔
2392
    return taosCreateTable(taos, buf, len);
131✔
2393
  } else if (type == TDMT_VND_ALTER_TABLE) {
93✔
2394
    return taosAlterTable(taos, buf, len);
24✔
2395
  } else if (type == TDMT_VND_DROP_TABLE) {
69✔
2396
    return taosDropTable(taos, buf, len);
5✔
2397
  } else if (type == TDMT_VND_DELETE) {
64✔
2398
    return taosDeleteData(taos, buf, len);
3✔
2399
  } else if (type == RES_TYPE__TMQ_METADATA) {
61✔
2400
    return tmqWriteRawMetaDataImpl(taos, buf, len);
10✔
2401
  } else if (type == RES_TYPE__TMQ) {
51✔
2402
    return tmqWriteRawDataImpl(taos, buf, len);
37✔
2403
  } else if (type == RES_TYPE__TMQ_BATCH_META) {
14!
2404
    return tmqWriteBatchMetaDataImpl(taos, buf, len);
14✔
2405
  }
2406
  return TSDB_CODE_INVALID_PARA;
×
2407
}
2408

2409
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
234✔
2410
  if (taos == NULL || raw.raw == NULL || raw.raw_len <= 0) {
234!
2411
    SET_ERROR_MSG("taos:%p or data:%p is NULL or raw_len <= 0", taos, raw.raw);
13✔
2412
    return TSDB_CODE_INVALID_PARA;
13✔
2413
  }
2414

2415
  return writeRawImpl(taos, raw.raw, raw.raw_len, raw.raw_type);
221✔
2416
}
2417

2418
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen) {
14✔
2419
  if (taos == NULL || meta == NULL) {
14!
2420
    return TSDB_CODE_INVALID_PARA;
×
2421
  }
2422
  SMqBatchMetaRsp rsp = {0};
14✔
2423
  SDecoder        coder = {0};
14✔
2424
  int32_t         code = TSDB_CODE_SUCCESS;
14✔
2425

2426
  // decode and process req
2427
  tDecoderInit(&coder, meta, metaLen);
14✔
2428
  if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) {
14!
2429
    code = TSDB_CODE_INVALID_PARA;
×
2430
    goto end;
×
2431
  }
2432
  int32_t num = taosArrayGetSize(rsp.batchMetaReq);
14✔
2433
  for (int32_t i = 0; i < num; i++) {
143✔
2434
    int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
129✔
2435
    RAW_NULL_CHECK(len);
129!
2436
    void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
129✔
2437
    RAW_NULL_CHECK(tmpBuf);
129!
2438
    SDecoder   metaCoder = {0};
129✔
2439
    SMqMetaRsp metaRsp = {0};
129✔
2440
    tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead));
129✔
2441
    if (tDecodeMqMetaRsp(&metaCoder, &metaRsp) < 0) {
129!
2442
      code = TSDB_CODE_INVALID_PARA;
×
2443
      goto end;
×
2444
    }
2445
    code = writeRawImpl(taos, metaRsp.metaRsp, metaRsp.metaRspLen, metaRsp.resMsgType);
129✔
2446
    tDeleteMqMetaRsp(&metaRsp);
129✔
2447
    if (code != TSDB_CODE_SUCCESS) {
129!
2448
      goto end;
×
2449
    }
2450
  }
2451

2452
end:
14✔
2453
  tDeleteMqBatchMetaRsp(&rsp);
14✔
2454
  return code;
14✔
2455
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc