• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3544

30 Nov 2024 03:06AM UTC coverage: 60.88% (+0.04%) from 60.842%
#3544

push

travis-ci

web-flow
Merge pull request #28988 from taosdata/main

merge: from main to 3.0 branch

120724 of 253479 branches covered (47.63%)

Branch coverage included in aggregate %.

407 of 489 new or added lines in 21 files covered. (83.23%)

1148 existing lines in 113 files now uncovered.

201919 of 276488 relevant lines covered (73.03%)

18898587.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.43
/source/client/src/clientRawBlockWrite.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "parser.h"
19
#include "tcol.h"
20
#include "tcompression.h"
21
#include "tdatablock.h"
22
#include "tdef.h"
23
#include "tglobal.h"
24
#include "tmsgtype.h"
25

26
#define RAW_NULL_CHECK(c) \
27
  do {                    \
28
    if (c == NULL) {      \
29
      code = terrno;      \
30
      goto end;           \
31
    }                     \
32
  } while (0)
33

34
#define RAW_FALSE_CHECK(c)           \
35
  do {                               \
36
    if (!c) {                        \
37
      code = TSDB_CODE_INVALID_PARA; \
38
      goto end;                      \
39
    }                                \
40
  } while (0)
41

42
#define RAW_RETURN_CHECK(c) \
43
  do {                      \
44
    code = c;               \
45
    if (code != 0) {        \
46
      goto end;             \
47
    }                       \
48
  } while (0)
49

50
#define LOG_ID_TAG   "connId:0x%" PRIx64 ",QID:0x%" PRIx64
51
#define LOG_ID_VALUE *(int64_t*)taos, pRequest->requestId
52

53
#define TMQ_META_VERSION "1.0"
54

55
static int32_t  tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen);
56
static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); }
118✔
57
static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t,
96✔
58
                                 SColCmprWrapper* pColCmprRow, cJSON** pJson) {
59
  int32_t code = TSDB_CODE_SUCCESS;
96✔
60
  int8_t  buildDefaultCompress = 0;
96✔
61
  if (pColCmprRow->nCols <= 0) {
96!
62
    buildDefaultCompress = 1;
×
63
  }
64

65
  char*  string = NULL;
96✔
66
  cJSON* json = cJSON_CreateObject();
96✔
67
  RAW_NULL_CHECK(json);
96!
68
  cJSON* type = cJSON_CreateString("create");
96✔
69
  RAW_NULL_CHECK(type);
96!
70

71
  RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type));
96!
72
  cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super");
96✔
73
  RAW_NULL_CHECK(tableType);
96!
74
  RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableType", tableType));
96!
75
  cJSON* tableName = cJSON_CreateString(name);
96✔
76
  RAW_NULL_CHECK(tableName);
96!
77
  RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName));
96!
78

79
  cJSON* columns = cJSON_CreateArray();
96✔
80
  RAW_NULL_CHECK(columns);
96!
81
  for (int i = 0; i < schemaRow->nCols; i++) {
584✔
82
    cJSON* column = cJSON_CreateObject();
488✔
83
    RAW_NULL_CHECK(column);
488!
84
    SSchema* s = schemaRow->pSchema + i;
488✔
85
    cJSON*   cname = cJSON_CreateString(s->name);
488✔
86
    RAW_NULL_CHECK(cname);
488!
87
    RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "name", cname));
488!
88
    cJSON* ctype = cJSON_CreateNumber(s->type);
488✔
89
    RAW_NULL_CHECK(ctype);
488!
90
    RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "type", ctype));
488!
91
    if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) {
546!
92
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
58✔
93
      cJSON*  cbytes = cJSON_CreateNumber(length);
58✔
94
      RAW_NULL_CHECK(cbytes);
58!
95
      RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "length", cbytes));
58!
96
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
430✔
97
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
8✔
98
      cJSON*  cbytes = cJSON_CreateNumber(length);
8✔
99
      RAW_NULL_CHECK(cbytes);
8!
100
      RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "length", cbytes));
8!
101
    }
102
    cJSON* isPk = cJSON_CreateBool(s->flags & COL_IS_KEY);
488✔
103
    RAW_NULL_CHECK(isPk);
488!
104
    RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "isPrimarykey", isPk));
488!
105
    RAW_FALSE_CHECK(cJSON_AddItemToArray(columns, column));
488!
106

107
    if (pColCmprRow == NULL) {
488!
108
      continue;
×
109
    }
110

111
    uint32_t alg = 0;
488✔
112
    if (buildDefaultCompress) {
488!
113
      alg = createDefaultColCmprByType(s->type);
×
114
    } else {
115
      SColCmpr* pColCmpr = pColCmprRow->pColCmpr + i;
488✔
116
      alg = pColCmpr->alg;
488✔
117
    }
118
    const char* encode = columnEncodeStr(COMPRESS_L1_TYPE_U32(alg));
488✔
119
    RAW_NULL_CHECK(encode);
488!
120
    const char* compress = columnCompressStr(COMPRESS_L2_TYPE_U32(alg));
488✔
121
    RAW_NULL_CHECK(compress);
488!
122
    const char* level = columnLevelStr(COMPRESS_L2_TYPE_LEVEL_U32(alg));
488✔
123
    RAW_NULL_CHECK(level);
488!
124

125
    cJSON* encodeJson = cJSON_CreateString(encode);
488✔
126
    RAW_NULL_CHECK(encodeJson);
488!
127
    RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "encode", encodeJson));
488!
128

129
    cJSON* compressJson = cJSON_CreateString(compress);
488✔
130
    RAW_NULL_CHECK(compressJson);
488!
131
    RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "compress", compressJson));
488!
132

133
    cJSON* levelJson = cJSON_CreateString(level);
488✔
134
    RAW_NULL_CHECK(levelJson);
488!
135
    RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "level", levelJson));
488!
136
  }
137
  RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "columns", columns));
96!
138

139
  cJSON* tags = cJSON_CreateArray();
96✔
140
  RAW_NULL_CHECK(tags);
96!
141
  for (int i = 0; schemaTag && i < schemaTag->nCols; i++) {
290✔
142
    cJSON* tag = cJSON_CreateObject();
194✔
143
    RAW_NULL_CHECK(tag);
194!
144
    SSchema* s = schemaTag->pSchema + i;
194✔
145
    cJSON*   tname = cJSON_CreateString(s->name);
194✔
146
    RAW_NULL_CHECK(tname);
194!
147
    RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "name", tname));
194!
148
    cJSON* ttype = cJSON_CreateNumber(s->type);
194✔
149
    RAW_NULL_CHECK(ttype);
194!
150
    RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "type", ttype));
194!
151
    if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) {
202!
152
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
8✔
153
      cJSON*  cbytes = cJSON_CreateNumber(length);
8✔
154
      RAW_NULL_CHECK(cbytes);
8!
155
      RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "length", cbytes));
8!
156
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
186✔
157
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
55✔
158
      cJSON*  cbytes = cJSON_CreateNumber(length);
55✔
159
      RAW_NULL_CHECK(cbytes);
55!
160
      RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "length", cbytes));
55!
161
    }
162
    RAW_FALSE_CHECK(cJSON_AddItemToArray(tags, tag));
194!
163
  }
164
  RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tags", tags));
96!
165

166
end:
96✔
167
  *pJson = json;
96✔
168
}
96✔
169

170
static int32_t setCompressOption(cJSON* json, uint32_t para) {
×
171
  uint8_t encode = COMPRESS_L1_TYPE_U32(para);
×
172
  int32_t code = 0;
×
173
  if (encode != 0) {
×
174
    const char* encodeStr = columnEncodeStr(encode);
×
175
    RAW_NULL_CHECK(encodeStr);
×
176
    cJSON* encodeJson = cJSON_CreateString(encodeStr);
×
177
    RAW_NULL_CHECK(encodeJson);
×
178
    RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "encode", encodeJson));
×
179
    return code;
×
180
  }
181
  uint8_t compress = COMPRESS_L2_TYPE_U32(para);
×
182
  if (compress != 0) {
×
183
    const char* compressStr = columnCompressStr(compress);
×
184
    RAW_NULL_CHECK(compressStr);
×
185
    cJSON* compressJson = cJSON_CreateString(compressStr);
×
186
    RAW_NULL_CHECK(compressJson);
×
187
    RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "compress", compressJson));
×
188
    return code;
×
189
  }
190
  uint8_t level = COMPRESS_L2_TYPE_LEVEL_U32(para);
×
191
  if (level != 0) {
×
192
    const char* levelStr = columnLevelStr(level);
×
193
    RAW_NULL_CHECK(levelStr);
×
194
    cJSON* levelJson = cJSON_CreateString(levelStr);
×
195
    RAW_NULL_CHECK(levelJson);
×
196
    RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "level", levelJson));
×
197
    return code;
×
198
  }
199

200
end:
×
201
  return code;
×
202
}
203
static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON** pJson) {
40✔
204
  SMAlterStbReq req = {0};
40✔
205
  cJSON*        json = NULL;
40✔
206
  char*         string = NULL;
40✔
207
  int32_t       code = 0;
40✔
208

209
  if (tDeserializeSMAlterStbReq(alterData, alterDataLen, &req) != 0) {
40!
210
    goto end;
×
211
  }
212

213
  json = cJSON_CreateObject();
40✔
214
  RAW_NULL_CHECK(json);
40!
215
  cJSON* type = cJSON_CreateString("alter");
40✔
216
  RAW_NULL_CHECK(type);
40!
217
  RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type));
40!
218
  SName name = {0};
40✔
219
  RAW_RETURN_CHECK(tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
40!
220
  cJSON* tableType = cJSON_CreateString("super");
40✔
221
  RAW_NULL_CHECK(tableType);
40!
222
  RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableType", tableType));
40!
223
  cJSON* tableName = cJSON_CreateString(name.tname);
40✔
224
  RAW_NULL_CHECK(tableName);
40!
225
  RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName));
40!
226

227
  cJSON* alterType = cJSON_CreateNumber(req.alterType);
40✔
228
  RAW_NULL_CHECK(alterType);
40!
229
  RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "alterType", alterType));
40!
230
  switch (req.alterType) {
40!
231
    case TSDB_ALTER_TABLE_ADD_TAG:
24✔
232
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
233
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
24✔
234
      RAW_NULL_CHECK(field);
24!
235
      cJSON* colName = cJSON_CreateString(field->name);
24✔
236
      RAW_NULL_CHECK(colName);
24!
237
      RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName));
24!
238
      cJSON* colType = cJSON_CreateNumber(field->type);
24✔
239
      RAW_NULL_CHECK(colType);
24!
240
      RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType));
24!
241

242
      if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
24!
243
          field->type == TSDB_DATA_TYPE_GEOMETRY) {
24!
244
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
8✔
245
        cJSON*  cbytes = cJSON_CreateNumber(length);
8✔
246
        RAW_NULL_CHECK(cbytes);
8!
247
        RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes));
8!
248
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
16!
249
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
250
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
251
        RAW_NULL_CHECK(cbytes);
×
252
        RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes));
×
253
      }
254
      break;
24✔
255
    }
256
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: {
×
257
      SFieldWithOptions* field = taosArrayGet(req.pFields, 0);
×
258
      RAW_NULL_CHECK(field);
×
259
      cJSON* colName = cJSON_CreateString(field->name);
×
260
      RAW_NULL_CHECK(colName);
×
261
      RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName));
×
262
      cJSON* colType = cJSON_CreateNumber(field->type);
×
263
      RAW_NULL_CHECK(colType);
×
264
      RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType));
×
265

266
      if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
×
267
          field->type == TSDB_DATA_TYPE_GEOMETRY) {
×
268
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
×
269
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
270
        RAW_NULL_CHECK(cbytes);
×
271
        RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes));
×
272
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
×
273
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
274
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
275
        RAW_NULL_CHECK(cbytes);
×
276
        RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes));
×
277
      }
278
      RAW_RETURN_CHECK(setCompressOption(json, field->compress));
×
279
      break;
×
280
    }
281
    case TSDB_ALTER_TABLE_DROP_TAG:
8✔
282
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
283
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
8✔
284
      RAW_NULL_CHECK(field);
8!
285
      cJSON* colName = cJSON_CreateString(field->name);
8✔
286
      RAW_NULL_CHECK(colName);
8!
287
      RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName));
8!
288
      break;
8✔
289
    }
290
    case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
8✔
291
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
292
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
8✔
293
      RAW_NULL_CHECK(field);
8!
294
      cJSON* colName = cJSON_CreateString(field->name);
8✔
295
      RAW_NULL_CHECK(colName);
8!
296
      RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName));
8!
297
      cJSON* colType = cJSON_CreateNumber(field->type);
8✔
298
      RAW_NULL_CHECK(colType);
8!
299
      RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType));
8!
300
      if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
8!
301
          field->type == TSDB_DATA_TYPE_GEOMETRY) {
8!
302
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
8✔
303
        cJSON*  cbytes = cJSON_CreateNumber(length);
8✔
304
        RAW_NULL_CHECK(cbytes);
8!
305
        RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes));
8!
306
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
×
307
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
308
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
309
        RAW_NULL_CHECK(cbytes);
×
310
        RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes));
×
311
      }
312
      break;
8✔
313
    }
314
    case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
×
315
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
316
      TAOS_FIELD* oldField = taosArrayGet(req.pFields, 0);
×
317
      RAW_NULL_CHECK(oldField);
×
318
      TAOS_FIELD* newField = taosArrayGet(req.pFields, 1);
×
319
      RAW_NULL_CHECK(newField);
×
320
      cJSON* colName = cJSON_CreateString(oldField->name);
×
321
      RAW_NULL_CHECK(colName);
×
322
      RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName));
×
323
      cJSON* colNewName = cJSON_CreateString(newField->name);
×
324
      RAW_NULL_CHECK(colNewName);
×
325
      RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colNewName", colNewName));
×
326
      break;
×
327
    }
328
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
×
329
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
×
330
      RAW_NULL_CHECK(field);
×
331
      cJSON* colName = cJSON_CreateString(field->name);
×
332
      RAW_NULL_CHECK(colName);
×
333
      RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName));
×
334
      RAW_RETURN_CHECK(setCompressOption(json, field->bytes));
×
335
      break;
×
336
    }
337
    default:
×
338
      break;
×
339
  }
340

341
end:
40✔
342
  tFreeSMAltertbReq(&req);
40✔
343
  *pJson = json;
40✔
344
}
40✔
345

346
static void processCreateStb(SMqMetaRsp* metaRsp, cJSON** pJson) {
78✔
347
  SVCreateStbReq req = {0};
78✔
348
  SDecoder       coder;
349

350
  uDebug("create stable data:%p", metaRsp);
78!
351
  // decode and process req
352
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
78✔
353
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
78✔
354
  tDecoderInit(&coder, data, len);
78✔
355

356
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
78!
357
    goto end;
×
358
  }
359
  buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE, &req.colCmpr, pJson);
78✔
360

361
end:
78✔
362
  uDebug("create stable return");
78!
363
  tDecoderClear(&coder);
78✔
364
}
78✔
365

366
static void processAlterStb(SMqMetaRsp* metaRsp, cJSON** pJson) {
40✔
367
  SVCreateStbReq req = {0};
40✔
368
  SDecoder       coder = {0};
40✔
369
  uDebug("alter stable data:%p", metaRsp);
40!
370

371
  // decode and process req
372
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
40✔
373
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
40✔
374
  tDecoderInit(&coder, data, len);
40✔
375

376
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
40!
377
    goto end;
×
378
  }
379
  buildAlterSTableJson(req.alterOriData, req.alterOriDataLen, pJson);
40✔
380

381
end:
40✔
382
  uDebug("alter stable return");
40!
383
  tDecoderClear(&coder);
40✔
384
}
40✔
385

386
static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
156✔
387
  STag*   pTag = (STag*)pCreateReq->ctb.pTag;
156✔
388
  char*   sname = pCreateReq->ctb.stbName;
156✔
389
  char*   name = pCreateReq->name;
156✔
390
  SArray* tagName = pCreateReq->ctb.tagName;
156✔
391
  int64_t id = pCreateReq->uid;
156✔
392
  uint8_t tagNum = pCreateReq->ctb.tagNum;
156✔
393
  int32_t code = 0;
156✔
394
  cJSON*  tags = NULL;
156✔
395
  cJSON*  tableName = cJSON_CreateString(name);
156✔
396
  RAW_NULL_CHECK(tableName);
156!
397
  RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName));
156!
398
  cJSON* using = cJSON_CreateString(sname);
156✔
399
  RAW_NULL_CHECK(using);
156!
400
  RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "using", using));
156!
401
  cJSON* tagNumJson = cJSON_CreateNumber(tagNum);
156✔
402
  RAW_NULL_CHECK(tagNumJson);
156!
403
  RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tagNum", tagNumJson));
156!
404

405
  tags = cJSON_CreateArray();
156✔
406
  RAW_NULL_CHECK(tags);
156!
407
  SArray* pTagVals = NULL;
156✔
408
  RAW_RETURN_CHECK(tTagToValArray(pTag, &pTagVals));
156!
409

410
  if (tTagIsJson(pTag)) {
156✔
411
    STag* p = (STag*)pTag;
20✔
412
    if (p->nTag == 0) {
20✔
413
      uError("p->nTag == 0");
10!
414
      goto end;
10✔
415
    }
416
    char* pJson = NULL;
10✔
417
    parseTagDatatoJson(pTag, &pJson);
10✔
418
    if (pJson == NULL) {
10!
419
      uError("parseTagDatatoJson failed, pJson == NULL");
×
420
      goto end;
×
421
    }
422
    cJSON* tag = cJSON_CreateObject();
10✔
423
    RAW_NULL_CHECK(tag);
10!
424
    STagVal* pTagVal = taosArrayGet(pTagVals, 0);
10✔
425
    RAW_NULL_CHECK(pTagVal);
10!
426
    char* ptname = taosArrayGet(tagName, 0);
10✔
427
    RAW_NULL_CHECK(ptname);
10!
428
    cJSON* tname = cJSON_CreateString(ptname);
10✔
429
    RAW_NULL_CHECK(tname);
10!
430
    RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "name", tname));
10!
431
    cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON);
10✔
432
    RAW_NULL_CHECK(ttype);
10!
433
    RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "type", ttype));
10!
434
    cJSON* tvalue = cJSON_CreateString(pJson);
10✔
435
    RAW_NULL_CHECK(tvalue);
10!
436
    RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "value", tvalue));
10!
437
    RAW_FALSE_CHECK(cJSON_AddItemToArray(tags, tag));
10!
438
    taosMemoryFree(pJson);
10✔
439
    goto end;
10✔
440
  }
441

442
  for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
465✔
443
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
329✔
444
    RAW_NULL_CHECK(pTagVal);
329!
445
    cJSON* tag = cJSON_CreateObject();
329✔
446
    RAW_NULL_CHECK(tag);
329!
447
    char* ptname = taosArrayGet(tagName, i);
329✔
448
    RAW_NULL_CHECK(ptname);
329!
449
    cJSON* tname = cJSON_CreateString(ptname);
329✔
450
    RAW_NULL_CHECK(tname);
329!
451
    RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "name", tname));
329!
452
    cJSON* ttype = cJSON_CreateNumber(pTagVal->type);
329✔
453
    RAW_NULL_CHECK(ttype);
329!
454
    RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "type", ttype));
329!
455

456
    cJSON* tvalue = NULL;
329✔
457
    if (IS_VAR_DATA_TYPE(pTagVal->type)) {
431!
458
      char*   buf = NULL;
102✔
459
      int64_t bufSize = 0;
102✔
460
      if (pTagVal->type == TSDB_DATA_TYPE_VARBINARY) {
102!
461
        bufSize = pTagVal->nData * 2 + 2 + 3;
×
462
      } else {
463
        bufSize = pTagVal->nData + 3;
102✔
464
      }
465
      buf = taosMemoryCalloc(bufSize, 1);
102✔
466

467
      RAW_NULL_CHECK(buf);
102!
468
      if (!buf) goto end;
102!
469
      if (dataConverToStr(buf, bufSize, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL) != TSDB_CODE_SUCCESS) {
102!
470
        taosMemoryFree(buf);
×
471
        goto end;
×
472
      }
473

474
      tvalue = cJSON_CreateString(buf);
102✔
475
      RAW_NULL_CHECK(tvalue);
102!
476
      taosMemoryFree(buf);
102✔
477
    } else {
478
      double val = 0;
227✔
479
      GET_TYPED_DATA(val, double, pTagVal->type, &pTagVal->i64);
227!
480
      tvalue = cJSON_CreateNumber(val);
227✔
481
      RAW_NULL_CHECK(tvalue);
227!
482
    }
483

484
    RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "value", tvalue));
329!
485
    RAW_FALSE_CHECK(cJSON_AddItemToArray(tags, tag));
329!
486
  }
487

488
end:
136✔
489
  RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tags", tags));
156!
490
  taosArrayDestroy(pTagVals);
156✔
491
}
156✔
492

493
static void buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs, cJSON** pJson) {
123✔
494
  int32_t code = 0;
123✔
495
  char*   string = NULL;
123✔
496
  cJSON*  json = cJSON_CreateObject();
123✔
497
  RAW_NULL_CHECK(json);
123!
498
  cJSON* type = cJSON_CreateString("create");
123✔
499
  RAW_NULL_CHECK(type);
123!
500
  RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type));
123!
501

502
  cJSON* tableType = cJSON_CreateString("child");
123✔
503
  RAW_NULL_CHECK(tableType);
123!
504
  RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableType", tableType));
123!
505

506
  buildChildElement(json, pCreateReq);
123✔
507
  cJSON* createList = cJSON_CreateArray();
123✔
508
  RAW_NULL_CHECK(createList);
123!
509
  for (int i = 0; nReqs > 1 && i < nReqs; i++) {
156✔
510
    cJSON* create = cJSON_CreateObject();
33✔
511
    RAW_NULL_CHECK(create);
33!
512
    buildChildElement(create, pCreateReq + i);
33✔
513
    RAW_FALSE_CHECK(cJSON_AddItemToArray(createList, create));
33!
514
  }
515
  RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "createList", createList));
123!
516

517
end:
123✔
518
  *pJson = json;
123✔
519
}
123✔
520

521
static void processCreateTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
131✔
522
  SDecoder           decoder = {0};
131✔
523
  SVCreateTbBatchReq req = {0};
131✔
524
  SVCreateTbReq*     pCreateReq;
525
  // decode
526
  uDebug("create table data:%p", metaRsp);
131!
527
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
131✔
528
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
131✔
529
  tDecoderInit(&decoder, data, len);
131✔
530
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
131!
531
    goto end;
×
532
  }
533

534
  // loop to create table
535
  if (req.nReqs > 0) {
131!
536
    pCreateReq = req.pReqs;
131✔
537
    if (pCreateReq->type == TSDB_CHILD_TABLE) {
131✔
538
      buildCreateCTableJson(req.pReqs, req.nReqs, pJson);
113✔
539
    } else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
18!
540
      buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE,
18✔
541
                           &pCreateReq->colCmpr, pJson);
542
    }
543
  }
544

545
end:
×
546
  uDebug("create table return");
131!
547
  tDeleteSVCreateTbBatchReq(&req);
131✔
548
  tDecoderClear(&decoder);
131✔
549
}
131✔
550

551
static void processAutoCreateTable(SMqDataRsp* rsp, char** string) {
10✔
552
  SDecoder*      decoder = NULL;
10✔
553
  SVCreateTbReq* pCreateReq = NULL;
10✔
554
  int32_t        code = 0;
10✔
555
  uDebug("auto create table data:%p", rsp);
10!
556
  if (rsp->createTableNum <= 0) {
10!
557
    uError("processAutoCreateTable rsp->createTableNum <= 0");
×
558
    goto end;
×
559
  }
560

561
  decoder = taosMemoryCalloc(rsp->createTableNum, sizeof(SDecoder));
10✔
562
  RAW_NULL_CHECK(decoder);
10!
563
  pCreateReq = taosMemoryCalloc(rsp->createTableNum, sizeof(SVCreateTbReq));
10✔
564
  RAW_NULL_CHECK(pCreateReq);
10!
565

566
  // loop to create table
567
  for (int32_t iReq = 0; iReq < rsp->createTableNum; iReq++) {
34✔
568
    // decode
569
    void** data = taosArrayGet(rsp->createTableReq, iReq);
24✔
570
    RAW_NULL_CHECK(data);
24!
571
    int32_t* len = taosArrayGet(rsp->createTableLen, iReq);
24✔
572
    RAW_NULL_CHECK(len);
24!
573
    tDecoderInit(&decoder[iReq], *data, *len);
24✔
574
    if (tDecodeSVCreateTbReq(&decoder[iReq], pCreateReq + iReq) < 0) {
24!
575
      goto end;
×
576
    }
577

578
    if (pCreateReq[iReq].type != TSDB_CHILD_TABLE) {
24!
579
      uError("processAutoCreateTable pCreateReq[iReq].type != TSDB_CHILD_TABLE");
×
580
      goto end;
×
581
    }
582
  }
583
  cJSON* pJson = NULL;
10✔
584
  buildCreateCTableJson(pCreateReq, rsp->createTableNum, &pJson);
10✔
585
  *string = cJSON_PrintUnformatted(pJson);
10✔
586
  cJSON_Delete(pJson);
10✔
587

588
end:
10✔
589
  uDebug("auto created table return, sql json:%s", *string);
10!
590
  for (int i = 0; decoder && pCreateReq && i < rsp->createTableNum; i++) {
34!
591
    tDecoderClear(&decoder[i]);
24✔
592
    taosMemoryFreeClear(pCreateReq[i].comment);
24!
593
    if (pCreateReq[i].type == TSDB_CHILD_TABLE) {
24!
594
      taosArrayDestroy(pCreateReq[i].ctb.tagName);
24✔
595
    }
596
  }
597
  taosMemoryFree(decoder);
10✔
598
  taosMemoryFree(pCreateReq);
10✔
599
}
10✔
600

601
static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
24✔
602
  SDecoder     decoder = {0};
24✔
603
  SVAlterTbReq vAlterTbReq = {0};
24✔
604
  char*        string = NULL;
24✔
605
  cJSON*       json = NULL;
24✔
606
  int32_t      code = 0;
24✔
607

608
  uDebug("alter table data:%p", metaRsp);
24!
609
  // decode
610
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
24✔
611
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
24✔
612
  tDecoderInit(&decoder, data, len);
24✔
613
  if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) {
24!
614
    uError("tDecodeSVAlterTbReq error");
×
615
    goto end;
×
616
  }
617

618
  json = cJSON_CreateObject();
24✔
619
  RAW_NULL_CHECK(json);
24!
620
  cJSON* type = cJSON_CreateString("alter");
24✔
621
  RAW_NULL_CHECK(type);
24!
622
  RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type));
24!
623
  cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ||
44✔
624
                                                vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL
20!
625
                                            ? "child"
626
                                            : "normal");
627
  RAW_NULL_CHECK(tableType);
24!
628
  RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableType", tableType));
24!
629
  cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
24✔
630
  RAW_NULL_CHECK(tableName);
24!
631
  RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName));
24!
632
  cJSON* alterType = cJSON_CreateNumber(vAlterTbReq.action);
24✔
633
  RAW_NULL_CHECK(alterType);
24!
634
  RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "alterType", alterType));
24!
635

636
  switch (vAlterTbReq.action) {
24!
637
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
4✔
638
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
4✔
639
      RAW_NULL_CHECK(colName);
4!
640
      RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName));
4!
641
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
4✔
642
      RAW_NULL_CHECK(colType);
4!
643
      RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType));
4!
644

645
      if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY ||
4!
646
          vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) {
4!
647
        int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
×
648
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
649
        RAW_NULL_CHECK(cbytes);
×
650
        RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes));
×
651
      } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
4!
652
        int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
653
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
654
        RAW_NULL_CHECK(cbytes);
×
655
        RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes));
×
656
      }
657
      break;
4✔
658
    }
659
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: {
×
660
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
×
661
      RAW_NULL_CHECK(colName);
×
662
      RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName));
×
663
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
×
664
      RAW_NULL_CHECK(colType);
×
665
      RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType));
×
666

667
      if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY ||
×
668
          vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) {
×
669
        int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
×
670
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
671
        RAW_NULL_CHECK(cbytes);
×
672
        RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes));
×
673
      } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
×
674
        int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
675
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
676
        RAW_NULL_CHECK(cbytes);
×
677
        RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes));
×
678
      }
679
      RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
×
680
      break;
×
681
    }
682
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
4✔
683
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
4✔
684
      RAW_NULL_CHECK(colName);
4!
685
      RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName));
4!
686
      break;
4✔
687
    }
688
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
4✔
689
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
4✔
690
      RAW_NULL_CHECK(colName);
4!
691
      RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName));
4!
692
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.colModType);
4✔
693
      RAW_NULL_CHECK(colType);
4!
694
      RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType));
4!
695
      if (vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY || vAlterTbReq.colModType == TSDB_DATA_TYPE_VARBINARY ||
4!
696
          vAlterTbReq.colModType == TSDB_DATA_TYPE_GEOMETRY) {
4!
697
        int32_t length = vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE;
×
698
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
699
        RAW_NULL_CHECK(cbytes);
×
700
        RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes));
×
701
      } else if (vAlterTbReq.colModType == TSDB_DATA_TYPE_NCHAR) {
4!
702
        int32_t length = (vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
4✔
703
        cJSON*  cbytes = cJSON_CreateNumber(length);
4✔
704
        RAW_NULL_CHECK(cbytes);
4!
705
        RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes));
4!
706
      }
707
      break;
4✔
708
    }
709
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
4✔
710
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
4✔
711
      RAW_NULL_CHECK(colName);
4!
712
      RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName));
4!
713
      cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName);
4✔
714
      RAW_NULL_CHECK(colNewName);
4!
715
      RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colNewName", colNewName));
4!
716
      break;
4✔
717
    }
718
    case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: {
4✔
719
      cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName);
4✔
720
      RAW_NULL_CHECK(tagName);
4!
721
      RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", tagName));
4!
722

723
      bool isNull = vAlterTbReq.isNull;
4✔
724
      if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
4!
725
        STag* jsonTag = (STag*)vAlterTbReq.pTagVal;
×
726
        if (jsonTag->nTag == 0) isNull = true;
×
727
      }
728
      if (!isNull) {
4!
729
        char* buf = NULL;
4✔
730

731
        if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
4!
732
          if (!tTagIsJson(vAlterTbReq.pTagVal)) {
×
733
            uError("processAlterTable isJson false");
×
734
            goto end;
×
735
          }
736
          parseTagDatatoJson(vAlterTbReq.pTagVal, &buf);
×
737
          if (buf == NULL) {
×
738
            uError("parseTagDatatoJson failed, buf == NULL");
×
739
            goto end;
×
740
          }
741
        } else {
742
          int64_t bufSize = 0;
4✔
743
          if (vAlterTbReq.tagType == TSDB_DATA_TYPE_VARBINARY) {
4!
744
            bufSize = vAlterTbReq.nTagVal * 2 + 2 + 3;
×
745
          } else {
746
            bufSize = vAlterTbReq.nTagVal + 3;
4✔
747
          }
748
          buf = taosMemoryCalloc(bufSize, 1);
4✔
749
          RAW_NULL_CHECK(buf);
4!
750
          if (dataConverToStr(buf, bufSize, vAlterTbReq.tagType, vAlterTbReq.pTagVal, vAlterTbReq.nTagVal, NULL) !=
4!
751
              TSDB_CODE_SUCCESS) {
752
            taosMemoryFree(buf);
×
753
            goto end;
×
754
          }
755
        }
756

757
        cJSON* colValue = cJSON_CreateString(buf);
4✔
758
        taosMemoryFree(buf);
4✔
759
        RAW_NULL_CHECK(colValue);
4!
760
        RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colValue", colValue));
4!
761
      }
762

763
      cJSON* isNullCJson = cJSON_CreateBool(isNull);
4✔
764
      RAW_NULL_CHECK(isNullCJson);
4!
765
      RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colValueNull", isNullCJson));
4!
766
      break;
4✔
767
    }
768
    case TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL: {
×
769
      int32_t nTags = taosArrayGetSize(vAlterTbReq.pMultiTag);
×
770
      if (nTags <= 0) {
×
771
        uError("processAlterTable parse multi tags error");
×
772
        goto end;
×
773
      }
774

775
      cJSON* tags = cJSON_CreateArray();
×
776
      RAW_NULL_CHECK(tags);
×
777
      for (int32_t i = 0; i < nTags; i++) {
×
778
        cJSON* member = cJSON_CreateObject();
×
779
        RAW_NULL_CHECK(member);
×
780

781
        SMultiTagUpateVal* pTagVal = taosArrayGet(vAlterTbReq.pMultiTag, i);
×
782
        cJSON*             tagName = cJSON_CreateString(pTagVal->tagName);
×
783
        RAW_NULL_CHECK(tagName);
×
784
        RAW_FALSE_CHECK(cJSON_AddItemToObject(member, "colName", tagName));
×
785

786
        if (pTagVal->tagType == TSDB_DATA_TYPE_JSON) {
×
787
          uError("processAlterTable isJson false");
×
788
          goto end;
×
789
        }
790
        bool isNull = pTagVal->isNull;
×
791
        if (!isNull) {
×
792
          char*   buf = NULL;
×
793
          int64_t bufSize = 0;
×
794
          if (pTagVal->tagType == TSDB_DATA_TYPE_VARBINARY) {
×
795
            bufSize = pTagVal->nTagVal * 2 + 2 + 3;
×
796
          } else {
797
            bufSize = pTagVal->nTagVal + 3;
×
798
          }
799
          buf = taosMemoryCalloc(bufSize, 1);
×
800
          RAW_NULL_CHECK(buf);
×
801
          if (dataConverToStr(buf, bufSize, pTagVal->tagType, pTagVal->pTagVal, pTagVal->nTagVal, NULL) !=
×
802
              TSDB_CODE_SUCCESS) {
803
            taosMemoryFree(buf);
×
804
            goto end;
×
805
          }
806
          cJSON* colValue = cJSON_CreateString(buf);
×
807
          taosMemoryFree(buf);
×
808
          RAW_NULL_CHECK(colValue);
×
809
          RAW_FALSE_CHECK(cJSON_AddItemToObject(member, "colValue", colValue));
×
810
        }
811
        cJSON* isNullCJson = cJSON_CreateBool(isNull);
×
812
        RAW_NULL_CHECK(isNullCJson);
×
813
        RAW_FALSE_CHECK(cJSON_AddItemToObject(member, "colValueNull", isNullCJson));
×
814
        RAW_FALSE_CHECK(cJSON_AddItemToArray(tags, member));
×
815
      }
816
      RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tags", tags));
×
817
      break;
×
818
    }
819

820
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
×
821
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
×
822
      RAW_NULL_CHECK(colName);
×
823
      RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName));
×
824
      RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
×
825
      break;
×
826
    }
827
    default:
4✔
828
      break;
4✔
829
  }
830

831
end:
24✔
832
  uDebug("alter table return");
24!
833
  if (vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL) {
24!
834
    taosArrayDestroy(vAlterTbReq.pMultiTag);
×
835
  }
836
  tDecoderClear(&decoder);
24✔
837
  *pJson = json;
24✔
838
}
24✔
839

840
static void processDropSTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
8✔
841
  SDecoder     decoder = {0};
8✔
842
  SVDropStbReq req = {0};
8✔
843
  cJSON*       json = NULL;
8✔
844
  int32_t      code = 0;
8✔
845

846
  uDebug("processDropSTable data:%p", metaRsp);
8!
847

848
  // decode
849
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
8✔
850
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
8✔
851
  tDecoderInit(&decoder, data, len);
8✔
852
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
8!
853
    uError("tDecodeSVDropStbReq failed");
×
854
    goto end;
×
855
  }
856

857
  json = cJSON_CreateObject();
8✔
858
  RAW_NULL_CHECK(json);
8!
859
  cJSON* type = cJSON_CreateString("drop");
8✔
860
  RAW_NULL_CHECK(type);
8!
861
  RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type));
8!
862
  cJSON* tableType = cJSON_CreateString("super");
8✔
863
  RAW_NULL_CHECK(tableType);
8!
864
  RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableType", tableType));
8!
865
  cJSON* tableName = cJSON_CreateString(req.name);
8✔
866
  RAW_NULL_CHECK(tableName);
8!
867
  RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName));
8!
868

869
end:
8✔
870
  uDebug("processDropSTable return");
8!
871
  tDecoderClear(&decoder);
8✔
872
  *pJson = json;
8✔
873
}
8✔
874
static void processDeleteTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
3✔
875
  SDeleteRes req = {0};
3✔
876
  SDecoder   coder = {0};
3✔
877
  cJSON*     json = NULL;
3✔
878
  int32_t    code = 0;
3✔
879

880
  uDebug("processDeleteTable data:%p", metaRsp);
3!
881
  // decode and process req
882
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
3✔
883
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
3✔
884

885
  tDecoderInit(&coder, data, len);
3✔
886
  if (tDecodeDeleteRes(&coder, &req) < 0) {
3!
887
    uError("tDecodeDeleteRes failed");
×
888
    goto end;
×
889
  }
890

891
  //  getTbName(req.tableFName);
892
  char sql[256] = {0};
3✔
893
  (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
3✔
894
                 req.tsColName, req.skey, req.tsColName, req.ekey);
895

896
  json = cJSON_CreateObject();
3✔
897
  RAW_NULL_CHECK(json);
3!
898
  cJSON* type = cJSON_CreateString("delete");
3✔
899
  RAW_NULL_CHECK(type);
3!
900
  RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type));
3!
901
  cJSON* sqlJson = cJSON_CreateString(sql);
3✔
902
  RAW_NULL_CHECK(sqlJson);
3!
903
  RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", sqlJson));
3!
904

905
end:
3✔
906
  uDebug("processDeleteTable return");
3!
907
  tDecoderClear(&coder);
3✔
908
  *pJson = json;
3✔
909
}
3✔
910

911
static void processDropTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
5✔
912
  SDecoder         decoder = {0};
5✔
913
  SVDropTbBatchReq req = {0};
5✔
914
  cJSON*           json = NULL;
5✔
915
  int32_t          code = 0;
5✔
916

917
  uDebug("processDropTable data:%p", metaRsp);
5!
918
  // decode
919
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
5✔
920
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
5✔
921
  tDecoderInit(&decoder, data, len);
5✔
922
  if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) {
5!
923
    uError("tDecodeSVDropTbBatchReq failed");
×
924
    goto end;
×
925
  }
926

927
  json = cJSON_CreateObject();
5✔
928
  RAW_NULL_CHECK(json);
5!
929
  cJSON* type = cJSON_CreateString("drop");
5✔
930
  RAW_NULL_CHECK(type);
5!
931
  RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type));
5!
932
  cJSON* tableNameList = cJSON_CreateArray();
5✔
933
  RAW_NULL_CHECK(tableNameList);
5!
934
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
11✔
935
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
6✔
936
    cJSON*       tableName = cJSON_CreateString(pDropTbReq->name);
6✔
937
    RAW_NULL_CHECK(tableName);
6!
938
    RAW_FALSE_CHECK(cJSON_AddItemToArray(tableNameList, tableName));
6!
939
  }
940
  RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableNameList", tableNameList));
5!
941

942
end:
5✔
943
  uDebug("processDropTable return");
5!
944
  tDecoderClear(&decoder);
5✔
945
  *pJson = json;
5✔
946
}
5✔
947

948
static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
118✔
949
  SVCreateStbReq req = {0};
118✔
950
  SDecoder       coder;
951
  SMCreateStbReq pReq = {0};
118✔
952
  int32_t        code = TSDB_CODE_SUCCESS;
118✔
953
  SRequestObj*   pRequest = NULL;
118✔
954

955
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
118!
956
  uDebug(LOG_ID_TAG " create stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
118!
957
  pRequest->syncQuery = true;
118✔
958
  if (!pRequest->pDb) {
118!
959
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
960
    goto end;
×
961
  }
962
  // decode and process req
963
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
118✔
964
  int32_t len = metaLen - sizeof(SMsgHead);
118✔
965
  tDecoderInit(&coder, data, len);
118✔
966
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
118!
967
    code = TSDB_CODE_INVALID_PARA;
×
968
    goto end;
×
969
  }
970

971
  int8_t           createDefaultCompress = 0;
118✔
972
  SColCmprWrapper* p = &req.colCmpr;
118✔
973
  if (p->nCols == 0) {
118!
974
    createDefaultCompress = 1;
×
975
  }
976
  // build create stable
977
  pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SFieldWithOptions));
118✔
978
  RAW_NULL_CHECK(pReq.pColumns);
118!
979
  for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
744✔
980
    SSchema*          pSchema = req.schemaRow.pSchema + i;
626✔
981
    SFieldWithOptions field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
626✔
982
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
626✔
983

984
    if (createDefaultCompress) {
626!
985
      field.compress = createDefaultColCmprByType(pSchema->type);
×
986
    } else {
987
      SColCmpr* pCmp = &req.colCmpr.pColCmpr[i];
626✔
988
      field.compress = pCmp->alg;
626✔
989
    }
990
    RAW_NULL_CHECK(taosArrayPush(pReq.pColumns, &field));
1,252!
991
  }
992
  pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
118✔
993
  RAW_NULL_CHECK(pReq.pTags);
118!
994
  for (int32_t i = 0; i < req.schemaTag.nCols; i++) {
440✔
995
    SSchema* pSchema = req.schemaTag.pSchema + i;
322✔
996
    SField   field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
322✔
997
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
322✔
998
    RAW_NULL_CHECK(taosArrayPush(pReq.pTags, &field));
644!
999
  }
1000

1001
  pReq.colVer = req.schemaRow.version;
118✔
1002
  pReq.tagVer = req.schemaTag.version;
118✔
1003
  pReq.numOfColumns = req.schemaRow.nCols;
118✔
1004
  pReq.numOfTags = req.schemaTag.nCols;
118✔
1005
  pReq.commentLen = -1;
118✔
1006
  pReq.suid = processSuid(req.suid, pRequest->pDb);
118✔
1007
  pReq.source = TD_REQ_FROM_TAOX;
118✔
1008
  pReq.igExists = true;
118✔
1009

1010
  uDebug(LOG_ID_TAG " create stable name:%s suid:%" PRId64 " processSuid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
118!
1011
         pReq.suid);
1012
  STscObj* pTscObj = pRequest->pTscObj;
118✔
1013
  SName    tableName = {0};
118✔
1014
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
118✔
1015
  RAW_RETURN_CHECK(tNameExtractFullName(&tableName, pReq.name));
118!
1016
  SCmdMsgInfo pCmdMsg = {0};
118✔
1017
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
118✔
1018
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
118✔
1019
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
118✔
1020
  if (pCmdMsg.msgLen <= 0) {
118!
1021
    code = TSDB_CODE_INVALID_PARA;
×
1022
    goto end;
×
1023
  }
1024
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
118✔
1025
  RAW_NULL_CHECK(pCmdMsg.pMsg);
118!
1026
  if (tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) <= 0) {
118!
1027
    code = TSDB_CODE_INVALID_PARA;
×
1028
    taosMemoryFree(pCmdMsg.pMsg);
×
1029
    goto end;
×
1030
  }
1031

1032
  SQuery pQuery = {0};
118✔
1033
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
118✔
1034
  pQuery.pCmdMsg = &pCmdMsg;
118✔
1035
  pQuery.msgType = pQuery.pCmdMsg->msgType;
118✔
1036
  pQuery.stableQuery = true;
118✔
1037

1038
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
118✔
1039

1040
  taosMemoryFree(pCmdMsg.pMsg);
118✔
1041

1042
  if (pRequest->code == TSDB_CODE_SUCCESS) {
118!
1043
    SCatalog* pCatalog = NULL;
118✔
1044
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
118!
1045
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
118!
1046
  }
1047

1048
  code = pRequest->code;
118✔
1049

1050
end:
118✔
1051
  uDebug(LOG_ID_TAG " create stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
118!
1052
  destroyRequest(pRequest);
118✔
1053
  tFreeSMCreateStbReq(&pReq);
118✔
1054
  tDecoderClear(&coder);
118✔
1055
  return code;
118✔
1056
}
1057

1058
static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
8✔
1059
  SVDropStbReq req = {0};
8✔
1060
  SDecoder     coder = {0};
8✔
1061
  SMDropStbReq pReq = {0};
8✔
1062
  int32_t      code = TSDB_CODE_SUCCESS;
8✔
1063
  SRequestObj* pRequest = NULL;
8✔
1064

1065
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
8!
1066
  uDebug(LOG_ID_TAG " drop stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
8!
1067
  pRequest->syncQuery = true;
8✔
1068
  if (!pRequest->pDb) {
8!
1069
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1070
    goto end;
×
1071
  }
1072
  // decode and process req
1073
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
8✔
1074
  int32_t len = metaLen - sizeof(SMsgHead);
8✔
1075
  tDecoderInit(&coder, data, len);
8✔
1076
  if (tDecodeSVDropStbReq(&coder, &req) < 0) {
8!
1077
    code = TSDB_CODE_INVALID_PARA;
×
1078
    goto end;
×
1079
  }
1080

1081
  SCatalog* pCatalog = NULL;
8✔
1082
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
8!
1083
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
8✔
1084
                           .requestId = pRequest->requestId,
8✔
1085
                           .requestObjRefId = pRequest->self,
8✔
1086
                           .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
8✔
1087
  SName            pName = {0};
8✔
1088
  toName(pRequest->pTscObj->acctId, pRequest->pDb, req.name, &pName);
8✔
1089
  STableMeta* pTableMeta = NULL;
8✔
1090
  code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
8✔
1091
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
8✔
1092
    code = TSDB_CODE_SUCCESS;
2✔
1093
    taosMemoryFreeClear(pTableMeta);
2!
1094
    goto end;
2✔
1095
  }
1096
  if (code != TSDB_CODE_SUCCESS) {
6!
1097
    goto end;
×
1098
  }
1099
  pReq.suid = pTableMeta->uid;
6✔
1100
  taosMemoryFreeClear(pTableMeta);
6!
1101

1102
  // build drop stable
1103
  pReq.igNotExists = true;
6✔
1104
  pReq.source = TD_REQ_FROM_TAOX;
6✔
1105
  //  pReq.suid = processSuid(req.suid, pRequest->pDb);
1106

1107
  uDebug(LOG_ID_TAG " drop stable name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
6!
1108
         pReq.suid);
1109
  STscObj* pTscObj = pRequest->pTscObj;
6✔
1110
  SName    tableName = {0};
6✔
1111
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
6✔
1112
  if (tNameExtractFullName(&tableName, pReq.name) != 0) {
6!
1113
    code = TSDB_CODE_INVALID_PARA;
×
1114
    goto end;
×
1115
  }
1116

1117
  SCmdMsgInfo pCmdMsg = {0};
6✔
1118
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
6✔
1119
  pCmdMsg.msgType = TDMT_MND_DROP_STB;
6✔
1120
  pCmdMsg.msgLen = tSerializeSMDropStbReq(NULL, 0, &pReq);
6✔
1121
  if (pCmdMsg.msgLen <= 0) {
6!
1122
    code = TSDB_CODE_INVALID_PARA;
×
1123
    goto end;
×
1124
  }
1125
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
6✔
1126
  RAW_NULL_CHECK(pCmdMsg.pMsg);
6!
1127
  if (tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) <= 0) {
6!
1128
    code = TSDB_CODE_INVALID_PARA;
×
1129
    taosMemoryFree(pCmdMsg.pMsg);
×
1130
    goto end;
×
1131
  }
1132

1133
  SQuery pQuery = {0};
6✔
1134
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
6✔
1135
  pQuery.pCmdMsg = &pCmdMsg;
6✔
1136
  pQuery.msgType = pQuery.pCmdMsg->msgType;
6✔
1137
  pQuery.stableQuery = true;
6✔
1138

1139
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
6✔
1140
  taosMemoryFree(pCmdMsg.pMsg);
6✔
1141
  if (pRequest->code == TSDB_CODE_SUCCESS) {
6!
1142
    // ignore the error code
1143
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
6!
1144
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
6!
1145
  }
1146

1147
  code = pRequest->code;
6✔
1148

1149
end:
8✔
1150
  uDebug(LOG_ID_TAG " drop stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
8!
1151
  destroyRequest(pRequest);
8✔
1152
  tDecoderClear(&coder);
8✔
1153
  return code;
8✔
1154
}
1155

1156
typedef struct SVgroupCreateTableBatch {
1157
  SVCreateTbBatchReq req;
1158
  SVgroupInfo        info;
1159
  char               dbName[TSDB_DB_NAME_LEN];
1160
} SVgroupCreateTableBatch;
1161

1162
static void destroyCreateTbReqBatch(void* data) {
133✔
1163
  SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data;
133✔
1164
  taosArrayDestroy(pTbBatch->req.pArray);
133✔
1165
}
133✔
1166

1167
static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
131✔
1168
  SVCreateTbBatchReq req = {0};
131✔
1169
  SDecoder           coder = {0};
131✔
1170
  int32_t            code = TSDB_CODE_SUCCESS;
131✔
1171
  SRequestObj*       pRequest = NULL;
131✔
1172
  SQuery*            pQuery = NULL;
131✔
1173
  SHashObj*          pVgroupHashmap = NULL;
131✔
1174
  SArray*            pTagList = taosArrayInit(0, POINTER_BYTES);
131✔
1175
  RAW_NULL_CHECK(pTagList);
131!
1176
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
131!
1177
  uDebug(LOG_ID_TAG " create table, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
131!
1178

1179
  pRequest->syncQuery = true;
131✔
1180
  if (!pRequest->pDb) {
131!
1181
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1182
    goto end;
×
1183
  }
1184
  // decode and process req
1185
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
131✔
1186
  int32_t len = metaLen - sizeof(SMsgHead);
131✔
1187
  tDecoderInit(&coder, data, len);
131✔
1188
  if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
131!
1189
    code = TSDB_CODE_INVALID_PARA;
×
1190
    goto end;
×
1191
  }
1192

1193
  STscObj* pTscObj = pRequest->pTscObj;
131✔
1194

1195
  SVCreateTbReq* pCreateReq = NULL;
131✔
1196
  SCatalog*      pCatalog = NULL;
131✔
1197
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
131!
1198
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
131✔
1199
  RAW_NULL_CHECK(pVgroupHashmap);
131!
1200
  taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
131✔
1201

1202
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
131✔
1203
                           .requestId = pRequest->requestId,
131✔
1204
                           .requestObjRefId = pRequest->self,
131✔
1205
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
131✔
1206

1207
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
131✔
1208
  RAW_NULL_CHECK(pRequest->tableList);
131!
1209
  // loop to create table
1210
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
272✔
1211
    pCreateReq = req.pReqs + iReq;
141✔
1212

1213
    SVgroupInfo pInfo = {0};
141✔
1214
    SName       pName = {0};
141✔
1215
    toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
141✔
1216
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
141✔
1217
    if (code != TSDB_CODE_SUCCESS) {
141!
1218
      goto end;
×
1219
    }
1220

1221
    pCreateReq->flags |= TD_CREATE_IF_NOT_EXISTS;
141✔
1222
    // change tag cid to new cid
1223
    if (pCreateReq->type == TSDB_CHILD_TABLE) {
141✔
1224
      STableMeta* pTableMeta = NULL;
123✔
1225
      SName       sName = {0};
123✔
1226
      tb_uid_t    oldSuid = pCreateReq->ctb.suid;
123✔
1227
      //      pCreateReq->ctb.suid = processSuid(pCreateReq->ctb.suid, pRequest->pDb);
1228
      toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName);
123✔
1229
      code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta);
123✔
1230
      if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
123!
1231
        code = TSDB_CODE_SUCCESS;
×
1232
        taosMemoryFreeClear(pTableMeta);
×
1233
        continue;
×
1234
      }
1235

1236
      if (code != TSDB_CODE_SUCCESS) {
123!
1237
        goto end;
×
1238
      }
1239
      pCreateReq->ctb.suid = pTableMeta->uid;
123✔
1240

1241
      SArray* pTagVals = NULL;
123✔
1242
      code = tTagToValArray((STag*)pCreateReq->ctb.pTag, &pTagVals);
123✔
1243
      if (code != TSDB_CODE_SUCCESS) {
123!
1244
        taosMemoryFreeClear(pTableMeta);
×
1245
        goto end;
×
1246
      }
1247

1248
      bool rebuildTag = false;
123✔
1249
      for (int32_t i = 0; i < taosArrayGetSize(pCreateReq->ctb.tagName); i++) {
377✔
1250
        char* tName = taosArrayGet(pCreateReq->ctb.tagName, i);
254✔
1251
        if (tName == NULL) {
254!
1252
          continue;
×
1253
        }
1254
        for (int32_t j = pTableMeta->tableInfo.numOfColumns;
254✔
1255
             j < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; j++) {
1,002✔
1256
          SSchema* tag = &pTableMeta->schema[j];
748✔
1257
          if (strcmp(tag->name, tName) == 0 && tag->type != TSDB_DATA_TYPE_JSON) {
748✔
1258
            STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
240✔
1259
            if (pTagVal) {
240!
1260
              if (pTagVal->cid != tag->colId) {
240✔
1261
                pTagVal->cid = tag->colId;
21✔
1262
                rebuildTag = true;
21✔
1263
              }
1264
            } else {
1265
              uError("create tb invalid data %s, size:%d index:%d cid:%d", pCreateReq->name,
×
1266
                     (int)taosArrayGetSize(pTagVals), i, tag->colId);
1267
            }
1268
          }
1269
        }
1270
      }
1271
      taosMemoryFreeClear(pTableMeta);
123!
1272
      if (rebuildTag) {
123✔
1273
        STag* ppTag = NULL;
13✔
1274
        code = tTagNew(pTagVals, 1, false, &ppTag);
13✔
1275
        taosArrayDestroy(pTagVals);
13✔
1276
        pTagVals = NULL;
13✔
1277
        if (code != TSDB_CODE_SUCCESS) {
13!
1278
          goto end;
×
1279
        }
1280
        if (NULL == taosArrayPush(pTagList, &ppTag)) {
13!
1281
          tTagFree(ppTag);
×
1282
          goto end;
×
1283
        }
1284
        pCreateReq->ctb.pTag = (uint8_t*)ppTag;
13✔
1285
      }
1286
      taosArrayDestroy(pTagVals);
123✔
1287
    }
1288
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
282!
1289

1290
    SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
141✔
1291
    if (pTableBatch == NULL) {
141✔
1292
      SVgroupCreateTableBatch tBatch = {0};
133✔
1293
      tBatch.info = pInfo;
133✔
1294
      tstrncpy(tBatch.dbName, pRequest->pDb, TSDB_DB_NAME_LEN);
133✔
1295

1296
      tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
133✔
1297
      RAW_NULL_CHECK(tBatch.req.pArray);
133!
1298
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pCreateReq));
266!
1299
      tBatch.req.source = TD_REQ_FROM_TAOX;
133✔
1300
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
133!
1301
    } else {  // add to the correct vgroup
1302
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pCreateReq));
16!
1303
    }
1304
  }
1305

1306
  if (taosHashGetSize(pVgroupHashmap) == 0) {
131!
1307
    goto end;
×
1308
  }
1309
  SArray* pBufArray = NULL;
131✔
1310
  RAW_RETURN_CHECK(serializeVgroupsCreateTableBatch(pVgroupHashmap, &pBufArray));
131!
1311
  pQuery = NULL;
131✔
1312
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
131✔
1313
  if (TSDB_CODE_SUCCESS != code) goto end;
131!
1314
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
131✔
1315
  pQuery->msgType = TDMT_VND_CREATE_TABLE;
131✔
1316
  pQuery->stableQuery = false;
131✔
1317
  code = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT, &pQuery->pRoot);
131✔
1318
  if (TSDB_CODE_SUCCESS != code) goto end;
131!
1319
  RAW_NULL_CHECK(pQuery->pRoot);
131!
1320

1321
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
131!
1322

1323
  launchQueryImpl(pRequest, pQuery, true, NULL);
131✔
1324
  if (pRequest->code == TSDB_CODE_SUCCESS) {
131!
1325
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
131!
1326
  }
1327

1328
  code = pRequest->code;
131✔
1329

1330
end:
131✔
1331
  uDebug(LOG_ID_TAG " create table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
131!
1332
  tDeleteSVCreateTbBatchReq(&req);
131✔
1333

1334
  taosHashCleanup(pVgroupHashmap);
131✔
1335
  destroyRequest(pRequest);
131✔
1336
  tDecoderClear(&coder);
131✔
1337
  qDestroyQuery(pQuery);
131✔
1338
  taosArrayDestroyP(pTagList, taosMemoryFree);
131✔
1339
  return code;
131✔
1340
}
1341

1342
typedef struct SVgroupDropTableBatch {
1343
  SVDropTbBatchReq req;
1344
  SVgroupInfo      info;
1345
  char             dbName[TSDB_DB_NAME_LEN];
1346
} SVgroupDropTableBatch;
1347

1348
static void destroyDropTbReqBatch(void* data) {
3✔
1349
  SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
3✔
1350
  taosArrayDestroy(pTbBatch->req.pArray);
3✔
1351
}
3✔
1352

1353
static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
5✔
1354
  SVDropTbBatchReq req = {0};
5✔
1355
  SDecoder         coder = {0};
5✔
1356
  int32_t          code = TSDB_CODE_SUCCESS;
5✔
1357
  SRequestObj*     pRequest = NULL;
5✔
1358
  SQuery*          pQuery = NULL;
5✔
1359
  SHashObj*        pVgroupHashmap = NULL;
5✔
1360

1361
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
5!
1362
  uDebug(LOG_ID_TAG " drop table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
5!
1363

1364
  pRequest->syncQuery = true;
5✔
1365
  if (!pRequest->pDb) {
5!
1366
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1367
    goto end;
×
1368
  }
1369
  // decode and process req
1370
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
5✔
1371
  int32_t len = metaLen - sizeof(SMsgHead);
5✔
1372
  tDecoderInit(&coder, data, len);
5✔
1373
  if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
5!
1374
    code = TSDB_CODE_INVALID_PARA;
×
1375
    goto end;
×
1376
  }
1377

1378
  STscObj* pTscObj = pRequest->pTscObj;
5✔
1379

1380
  SVDropTbReq* pDropReq = NULL;
5✔
1381
  SCatalog*    pCatalog = NULL;
5✔
1382
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
5!
1383

1384
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
5✔
1385
  RAW_NULL_CHECK(pVgroupHashmap);
5!
1386
  taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
5✔
1387

1388
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
5✔
1389
                           .requestId = pRequest->requestId,
5✔
1390
                           .requestObjRefId = pRequest->self,
5✔
1391
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
5✔
1392
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
5✔
1393
  RAW_NULL_CHECK(pRequest->tableList);
5!
1394
  // loop to create table
1395
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
11✔
1396
    pDropReq = req.pReqs + iReq;
6✔
1397
    pDropReq->igNotExists = true;
6✔
1398
    //    pDropReq->suid = processSuid(pDropReq->suid, pRequest->pDb);
1399

1400
    SVgroupInfo pInfo = {0};
6✔
1401
    SName       pName = {0};
6✔
1402
    toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
6✔
1403
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
6!
1404

1405
    STableMeta* pTableMeta = NULL;
6✔
1406
    code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
6✔
1407
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
6✔
1408
      code = TSDB_CODE_SUCCESS;
2✔
1409
      taosMemoryFreeClear(pTableMeta);
2!
1410
      continue;
2✔
1411
    }
1412
    if (code != TSDB_CODE_SUCCESS) {
4!
1413
      goto end;
×
1414
    }
1415
    tb_uid_t oldSuid = pDropReq->suid;
4✔
1416
    pDropReq->suid = pTableMeta->suid;
4✔
1417
    taosMemoryFreeClear(pTableMeta);
4!
1418
    uDebug(LOG_ID_TAG " drop table name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, pDropReq->name, oldSuid,
4!
1419
           pDropReq->suid);
1420

1421
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
8!
1422
    SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
4✔
1423
    if (pTableBatch == NULL) {
4✔
1424
      SVgroupDropTableBatch tBatch = {0};
3✔
1425
      tBatch.info = pInfo;
3✔
1426
      tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
3✔
1427
      RAW_NULL_CHECK(tBatch.req.pArray);
3!
1428
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pDropReq));
6!
1429
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
3!
1430
    } else {  // add to the correct vgroup
1431
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pDropReq));
2!
1432
    }
1433
  }
1434

1435
  if (taosHashGetSize(pVgroupHashmap) == 0) {
5✔
1436
    goto end;
2✔
1437
  }
1438
  SArray* pBufArray = NULL;
3✔
1439
  RAW_RETURN_CHECK(serializeVgroupsDropTableBatch(pVgroupHashmap, &pBufArray));
3!
1440
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
3✔
1441
  if (TSDB_CODE_SUCCESS != code) goto end;
3!
1442
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
3✔
1443
  pQuery->msgType = TDMT_VND_DROP_TABLE;
3✔
1444
  pQuery->stableQuery = false;
3✔
1445
  pQuery->pRoot = NULL;
3✔
1446
  code = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT, &pQuery->pRoot);
3✔
1447
  if (TSDB_CODE_SUCCESS != code) goto end;
3!
1448
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
3!
1449

1450
  launchQueryImpl(pRequest, pQuery, true, NULL);
3✔
1451
  if (pRequest->code == TSDB_CODE_SUCCESS) {
3!
1452
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
3!
1453
  }
1454
  code = pRequest->code;
3✔
1455

1456
end:
5✔
1457
  uDebug(LOG_ID_TAG " drop table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
5!
1458
  taosHashCleanup(pVgroupHashmap);
5✔
1459
  destroyRequest(pRequest);
5✔
1460
  tDecoderClear(&coder);
5✔
1461
  qDestroyQuery(pQuery);
5✔
1462
  return code;
5✔
1463
}
1464

1465
static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
3✔
1466
  SDeleteRes req = {0};
3✔
1467
  SDecoder   coder = {0};
3✔
1468
  char       sql[256] = {0};
3✔
1469
  int32_t    code = TSDB_CODE_SUCCESS;
3✔
1470

1471
  uDebug("connId:0x%" PRIx64 " delete data, meta:%p, len:%d", *(int64_t*)taos, meta, metaLen);
3!
1472

1473
  // decode and process req
1474
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
3✔
1475
  int32_t len = metaLen - sizeof(SMsgHead);
3✔
1476
  tDecoderInit(&coder, data, len);
3✔
1477
  if (tDecodeDeleteRes(&coder, &req) < 0) {
3!
1478
    code = TSDB_CODE_INVALID_PARA;
×
1479
    goto end;
×
1480
  }
1481

1482
  (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
3✔
1483
                 req.tsColName, req.skey, req.tsColName, req.ekey);
1484

1485
  TAOS_RES* res = taosQueryImpl(taos, sql, false, TD_REQ_FROM_TAOX);
3✔
1486
  RAW_NULL_CHECK(res);
3!
1487
  SRequestObj* pRequest = (SRequestObj*)res;
3✔
1488
  code = pRequest->code;
3✔
1489
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_PAR_GET_META_ERROR) {
3!
1490
    code = TSDB_CODE_SUCCESS;
1✔
1491
  }
1492
  taos_free_result(res);
3✔
1493

1494
end:
3✔
1495
  uDebug("connId:0x%" PRIx64 " delete data sql:%s, code:%s", *(int64_t*)taos, sql, tstrerror(code));
3!
1496
  tDecoderClear(&coder);
3✔
1497
  return code;
3✔
1498
}
1499

1500
static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
24✔
1501
  SVAlterTbReq   req = {0};
24✔
1502
  SDecoder       dcoder = {0};
24✔
1503
  int32_t        code = TSDB_CODE_SUCCESS;
24✔
1504
  SRequestObj*   pRequest = NULL;
24✔
1505
  SQuery*        pQuery = NULL;
24✔
1506
  SArray*        pArray = NULL;
24✔
1507
  SVgDataBlocks* pVgData = NULL;
24✔
1508

1509
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
24!
1510
  uDebug(LOG_ID_TAG " alter table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
24!
1511
  pRequest->syncQuery = true;
24✔
1512
  if (!pRequest->pDb) {
24!
1513
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1514
    goto end;
×
1515
  }
1516
  // decode and process req
1517
  void*   data = POINTER_SHIFT(meta, sizeof(SMsgHead));
24✔
1518
  int32_t len = metaLen - sizeof(SMsgHead);
24✔
1519
  tDecoderInit(&dcoder, data, len);
24✔
1520
  if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) {
24!
1521
    code = TSDB_CODE_INVALID_PARA;
×
1522
    goto end;
×
1523
  }
1524

1525
  // do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
1526
  if (req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS) {
24✔
1527
    goto end;
4✔
1528
  }
1529

1530
  STscObj*  pTscObj = pRequest->pTscObj;
20✔
1531
  SCatalog* pCatalog = NULL;
20✔
1532
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
20!
1533
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
20✔
1534
                           .requestId = pRequest->requestId,
20✔
1535
                           .requestObjRefId = pRequest->self,
20✔
1536
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
20✔
1537

1538
  SVgroupInfo pInfo = {0};
20✔
1539
  SName       pName = {0};
20✔
1540
  toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
20✔
1541
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
20!
1542
  pArray = taosArrayInit(1, sizeof(void*));
20✔
1543
  RAW_NULL_CHECK(pArray);
20!
1544

1545
  pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
20✔
1546
  RAW_NULL_CHECK(pVgData);
20!
1547
  pVgData->vg = pInfo;
20✔
1548

1549
  int tlen = 0;
20✔
1550
  req.source = TD_REQ_FROM_TAOX;
20✔
1551
  tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code);
20!
1552
  if (code != 0) {
20!
1553
    code = TSDB_CODE_OUT_OF_MEMORY;
×
1554
    goto end;
×
1555
  }
1556
  tlen += sizeof(SMsgHead);
20✔
1557
  void* pMsg = taosMemoryMalloc(tlen);
20✔
1558
  RAW_NULL_CHECK(pMsg);
20!
1559
  ((SMsgHead*)pMsg)->vgId = htonl(pInfo.vgId);
20✔
1560
  ((SMsgHead*)pMsg)->contLen = htonl(tlen);
20✔
1561
  void*    pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
20✔
1562
  SEncoder coder = {0};
20✔
1563
  tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
20✔
1564
  code = tEncodeSVAlterTbReq(&coder, &req);
20✔
1565
  if (code != 0) {
20!
1566
    tEncoderClear(&coder);
×
1567
    code = TSDB_CODE_OUT_OF_MEMORY;
×
1568
    goto end;
×
1569
  }
1570
  tEncoderClear(&coder);
20✔
1571

1572
  pVgData->pData = pMsg;
20✔
1573
  pVgData->size = tlen;
20✔
1574

1575
  pVgData->numOfTables = 1;
20✔
1576
  RAW_NULL_CHECK(taosArrayPush(pArray, &pVgData));
20!
1577

1578
  pQuery = NULL;
20✔
1579
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
20✔
1580
  if (NULL == pQuery) goto end;
20!
1581
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
20✔
1582
  pQuery->msgType = TDMT_VND_ALTER_TABLE;
20✔
1583
  pQuery->stableQuery = false;
20✔
1584
  pQuery->pRoot = NULL;
20✔
1585
  code = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT, &pQuery->pRoot);
20✔
1586
  if (TSDB_CODE_SUCCESS != code) goto end;
20!
1587
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pArray));
20!
1588

1589
  launchQueryImpl(pRequest, pQuery, true, NULL);
20✔
1590

1591
  pVgData = NULL;
20✔
1592
  pArray = NULL;
20✔
1593
  code = pRequest->code;
20✔
1594
  if (code == TSDB_CODE_TDB_TABLE_NOT_EXIST) {
20✔
1595
    code = TSDB_CODE_SUCCESS;
1✔
1596
  }
1597

1598
  if (pRequest->code == TSDB_CODE_SUCCESS) {
20✔
1599
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
19✔
1600
    if (pRes->res != NULL) {
19✔
1601
      code = handleAlterTbExecRes(pRes->res, pCatalog);
16✔
1602
    }
1603
  }
1604
end:
4✔
1605
  uDebug(LOG_ID_TAG " alter table return, meta:%p, len:%d, msg:%s", LOG_ID_VALUE, meta, metaLen, tstrerror(code));
24!
1606
  taosArrayDestroy(pArray);
24✔
1607
  if (pVgData) taosMemoryFreeClear(pVgData->pData);
24!
1608
  taosMemoryFreeClear(pVgData);
24!
1609
  destroyRequest(pRequest);
24✔
1610
  tDecoderClear(&dcoder);
24✔
1611
  qDestroyQuery(pQuery);
24✔
1612
  return code;
24✔
1613
}
1614

1615
int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD* fields,
1✔
1616
                                     int numFields) {
1617
  return taos_write_raw_block_with_fields_with_reqid(taos, rows, pData, tbname, fields, numFields, 0);
1✔
1618
}
1619

1620
int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname,
1✔
1621
                                                TAOS_FIELD* fields, int numFields, int64_t reqid) {
1622
  if (!taos || !pData || !tbname) {
1!
1623
    return TSDB_CODE_INVALID_PARA;
×
1624
  }
1625
  int32_t     code = TSDB_CODE_SUCCESS;
1✔
1626
  STableMeta* pTableMeta = NULL;
1✔
1627
  SQuery*     pQuery = NULL;
1✔
1628
  SHashObj*   pVgHash = NULL;
1✔
1629

1630
  SRequestObj* pRequest = NULL;
1✔
1631
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
1!
1632

1633
  uDebug(LOG_ID_TAG " write raw block with field, rows:%d, pData:%p, tbname:%s, fields:%p, numFields:%d", LOG_ID_VALUE,
1!
1634
         rows, pData, tbname, fields, numFields);
1635

1636
  pRequest->syncQuery = true;
1✔
1637
  if (!pRequest->pDb) {
1!
1638
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1639
    goto end;
×
1640
  }
1641

1642
  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
1✔
1643
  tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
1✔
1644
  tstrncpy(pName.tname, tbname, sizeof(pName.tname));
1✔
1645

1646
  struct SCatalog* pCatalog = NULL;
1✔
1647
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
1!
1648

1649
  SRequestConnInfo conn = {0};
1✔
1650
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
1✔
1651
  conn.requestId = pRequest->requestId;
1✔
1652
  conn.requestObjRefId = pRequest->self;
1✔
1653
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
1✔
1654

1655
  SVgroupInfo vgData = {0};
1✔
1656
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData));
1!
1657
  RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
1!
1658
  RAW_RETURN_CHECK(smlInitHandle(&pQuery));
1!
1659
  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
1✔
1660
  RAW_NULL_CHECK(pVgHash);
1!
1661
  RAW_RETURN_CHECK(
1!
1662
      taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
1663
  RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0, false));
1!
1664
  RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
1!
1665

1666
  launchQueryImpl(pRequest, pQuery, true, NULL);
1✔
1667
  code = pRequest->code;
1✔
1668

1669
end:
1✔
1670
  uDebug(LOG_ID_TAG " write raw block with field return, msg:%s", LOG_ID_VALUE, tstrerror(code));
1!
1671
  taosMemoryFreeClear(pTableMeta);
1!
1672
  qDestroyQuery(pQuery);
1✔
1673
  destroyRequest(pRequest);
1✔
1674
  taosHashCleanup(pVgHash);
1✔
1675
  return code;
1✔
1676
}
1677

1678
int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) {
7✔
1679
  return taos_write_raw_block_with_reqid(taos, rows, pData, tbname, 0);
7✔
1680
}
1681

1682
int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname, int64_t reqid) {
7✔
1683
  if (!taos || !pData || !tbname) {
7!
1684
    return TSDB_CODE_INVALID_PARA;
×
1685
  }
1686
  int32_t     code = TSDB_CODE_SUCCESS;
7✔
1687
  STableMeta* pTableMeta = NULL;
7✔
1688
  SQuery*     pQuery = NULL;
7✔
1689
  SHashObj*   pVgHash = NULL;
7✔
1690

1691
  SRequestObj* pRequest = NULL;
7✔
1692
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
7!
1693

1694
  uDebug(LOG_ID_TAG " write raw block, rows:%d, pData:%p, tbname:%s", LOG_ID_VALUE, rows, pData, tbname);
7!
1695

1696
  pRequest->syncQuery = true;
7✔
1697
  if (!pRequest->pDb) {
7!
1698
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1699
    goto end;
×
1700
  }
1701

1702
  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
7✔
1703
  tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
7✔
1704
  tstrncpy(pName.tname, tbname, sizeof(pName.tname));
7✔
1705

1706
  struct SCatalog* pCatalog = NULL;
7✔
1707
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
7!
1708

1709
  SRequestConnInfo conn = {0};
7✔
1710
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
7✔
1711
  conn.requestId = pRequest->requestId;
7✔
1712
  conn.requestObjRefId = pRequest->self;
7✔
1713
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
7✔
1714

1715
  SVgroupInfo vgData = {0};
7✔
1716
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData));
7!
1717
  RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
7✔
1718
  RAW_RETURN_CHECK(smlInitHandle(&pQuery));
6!
1719
  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
6✔
1720
  RAW_NULL_CHECK(pVgHash);
6!
1721
  RAW_RETURN_CHECK(
6!
1722
      taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
1723
  RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false, NULL, 0, false));
6✔
1724
  RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
4!
1725

1726
  launchQueryImpl(pRequest, pQuery, true, NULL);
4✔
1727
  code = pRequest->code;
4✔
1728

1729
end:
7✔
1730
  uDebug(LOG_ID_TAG " write raw block return, msg:%s", LOG_ID_VALUE, tstrerror(code));
7!
1731
  taosMemoryFreeClear(pTableMeta);
7✔
1732
  qDestroyQuery(pQuery);
7✔
1733
  destroyRequest(pRequest);
7✔
1734
  taosHashCleanup(pVgHash);
7✔
1735
  return code;
7✔
1736
}
1737

1738
static void* getRawDataFromRes(void* pRetrieve) {
128✔
1739
  void* rawData = NULL;
128✔
1740
  // deal with compatibility
1741
  if (*(int64_t*)pRetrieve == 0) {
128!
1742
    rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
×
1743
  } else if (*(int64_t*)pRetrieve == 1) {
128!
1744
    rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
128✔
1745
  }
1746
  return rawData;
128✔
1747
}
1748

1749
static int32_t buildCreateTbMap(SMqDataRsp* rsp, SHashObj* pHashObj) {
10✔
1750
  // find schema data info
1751
  int32_t       code = 0;
10✔
1752
  SVCreateTbReq pCreateReq = {0};
10✔
1753
  SDecoder      decoderTmp = {0};
10✔
1754

1755
  for (int j = 0; j < rsp->createTableNum; j++) {
34✔
1756
    void** dataTmp = taosArrayGet(rsp->createTableReq, j);
24✔
1757
    RAW_NULL_CHECK(dataTmp);
24!
1758
    int32_t* lenTmp = taosArrayGet(rsp->createTableLen, j);
24✔
1759
    RAW_NULL_CHECK(lenTmp);
24!
1760

1761
    tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
24✔
1762
    RAW_RETURN_CHECK(tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq));
24!
1763

1764
    if (pCreateReq.type != TSDB_CHILD_TABLE) {
24!
1765
      code = TSDB_CODE_INVALID_MSG;
×
1766
      goto end;
×
1767
    }
1768
    if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL) {
24✔
1769
      RAW_RETURN_CHECK(
22!
1770
          taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq)));
1771
    } else {
1772
      tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
2✔
1773
      pCreateReq = (SVCreateTbReq){0};
2✔
1774
    }
1775

1776
    tDecoderClear(&decoderTmp);
24✔
1777
  }
1778
  return 0;
10✔
1779

1780
end:
×
1781
  tDecoderClear(&decoderTmp);
×
1782
  tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
×
1783
  return code;
×
1784
}
1785

1786
typedef enum {
1787
  WRITE_RAW_INIT_START = 0,
1788
  WRITE_RAW_INIT_OK,
1789
  WRITE_RAW_INIT_FAIL,
1790
} WRITE_RAW_INIT_STATUS;
1791

1792
static SHashObj* writeRawCache = NULL;
1793
static int8_t    initFlag = 0;
1794
static int8_t    initedFlag = WRITE_RAW_INIT_START;
1795

1796
typedef struct {
1797
  SHashObj* pVgHash;
1798
  SHashObj* pNameHash;
1799
  SHashObj* pMetaHash;
1800
} rawCacheInfo;
1801

1802
typedef struct {
1803
  SVgroupInfo vgInfo;
1804
  int64_t     uid;
1805
  int64_t     suid;
1806
} tbInfo;
1807

1808
static void tmqFreeMeta(void* data) {
41✔
1809
  STableMeta* pTableMeta = *(STableMeta**)data;
41✔
1810
  taosMemoryFree(pTableMeta);
41✔
1811
}
41✔
1812

1813
static void freeRawCache(void* data) {
×
1814
  rawCacheInfo* pRawCache = (rawCacheInfo*)data;
×
1815
  taosHashCleanup(pRawCache->pMetaHash);
×
1816
  taosHashCleanup(pRawCache->pNameHash);
×
1817
  taosHashCleanup(pRawCache->pVgHash);
×
1818
}
×
1819

1820
static int32_t initRawCacheHash() {
15✔
1821
  if (writeRawCache == NULL) {
15!
1822
    writeRawCache = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
15✔
1823
    if (writeRawCache == NULL) {
15!
1824
      return terrno;
×
1825
    }
1826
    taosHashSetFreeFp(writeRawCache, freeRawCache);
15✔
1827
  }
1828
  return 0;
15✔
1829
}
1830

1831
static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW) {
7✔
1832
  char* p = (char*)rawData;
7✔
1833
  // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
1834
  // column length |
1835
  p += sizeof(int32_t);
7✔
1836
  p += sizeof(int32_t);
7✔
1837
  p += sizeof(int32_t);
7✔
1838
  p += sizeof(int32_t);
7✔
1839
  p += sizeof(int32_t);
7✔
1840
  p += sizeof(uint64_t);
7✔
1841
  int8_t* fields = p;
7✔
1842

1843
  if (pSW->nCols != pTableMeta->tableInfo.numOfColumns) {
7✔
1844
    return true;
3✔
1845
  }
1846
  for (int i = 0; i < pSW->nCols; i++) {
22✔
1847
    int j = 0;
18✔
1848
    for (; j < pTableMeta->tableInfo.numOfColumns; j++) {
50!
1849
      SSchema* pColSchema = &pTableMeta->schema[j];
50✔
1850
      char*    fieldName = pSW->pSchema[i].name;
50✔
1851

1852
      if (strcmp(pColSchema->name, fieldName) == 0) {
50✔
1853
        if (*fields != pColSchema->type || *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
18!
1854
          return true;
×
1855
        }
1856
        break;
18✔
1857
      }
1858
    }
1859
    fields += sizeof(int8_t) + sizeof(int32_t);
18✔
1860

1861
    if (j == pTableMeta->tableInfo.numOfColumns) return true;
18!
1862
  }
1863
  return false;
4✔
1864
}
1865

1866
static int32_t getRawCache(SHashObj** pVgHash, SHashObj** pNameHash, SHashObj** pMetaHash, void* key) {
47✔
1867
  int32_t code = 0;
47✔
1868
  void*   cacheInfo = taosHashGet(writeRawCache, &key, POINTER_BYTES);
47✔
1869
  if (cacheInfo == NULL) {
47!
1870
    *pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
47✔
1871
    RAW_NULL_CHECK(*pVgHash);
47!
1872
    *pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
47✔
1873
    RAW_NULL_CHECK(*pNameHash);
47!
1874
    *pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
47✔
1875
    RAW_NULL_CHECK(*pMetaHash);
47!
1876
    taosHashSetFreeFp(*pMetaHash, tmqFreeMeta);
47✔
1877
    rawCacheInfo info = {*pVgHash, *pNameHash, *pMetaHash};
47✔
1878
    RAW_RETURN_CHECK(taosHashPut(writeRawCache, &key, POINTER_BYTES, &info, sizeof(rawCacheInfo)));
47!
1879
  } else {
UNCOV
1880
    rawCacheInfo* info = (rawCacheInfo*)cacheInfo;
×
UNCOV
1881
    *pVgHash = info->pVgHash;
×
UNCOV
1882
    *pNameHash = info->pNameHash;
×
UNCOV
1883
    *pMetaHash = info->pMetaHash;
×
1884
  }
1885

1886
  return 0;
47✔
1887
end:
×
1888
  taosHashCleanup(*pMetaHash);
×
1889
  taosHashCleanup(*pNameHash);
×
1890
  taosHashCleanup(*pVgHash);
×
1891
  return code;
×
1892
}
1893

1894
static int32_t buildRawRequest(TAOS* taos, SRequestObj** pRequest, SCatalog** pCatalog, SRequestConnInfo* conn) {
47✔
1895
  int32_t code = 0;
47✔
1896
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, pRequest, 0));
47!
1897
  (*pRequest)->syncQuery = true;
47✔
1898
  if (!(*pRequest)->pDb) {
47!
1899
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1900
    goto end;
×
1901
  }
1902

1903
  RAW_RETURN_CHECK(catalogGetHandle((*pRequest)->pTscObj->pAppInfo->clusterId, pCatalog));
47!
1904
  conn->pTrans = (*pRequest)->pTscObj->pAppInfo->pTransporter;
47✔
1905
  conn->requestId = (*pRequest)->requestId;
47✔
1906
  conn->requestObjRefId = (*pRequest)->self;
47✔
1907
  conn->mgmtEps = getEpSet_s(&(*pRequest)->pTscObj->pAppInfo->mgmtEp);
47✔
1908

1909
end:
47✔
1910
  return code;
47✔
1911
}
1912

1913
typedef int32_t _raw_decode_func_(SDecoder* pDecoder, SMqDataRsp* pRsp);
1914
static int32_t  decodeRawData(SDecoder* decoder, void* data, int32_t dataLen, _raw_decode_func_ func,
47✔
1915
                              SMqRspObj* rspObj) {
1916
   int8_t dataVersion = *(int8_t*)data;
47✔
1917
   if (dataVersion >= MQ_DATA_RSP_VERSION) {
47!
1918
     data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
47✔
1919
     dataLen -= sizeof(int8_t) + sizeof(int32_t);
47✔
1920
  }
1921

1922
   rspObj->resIter = -1;
47✔
1923
   tDecoderInit(decoder, data, dataLen);
47✔
1924
   int32_t code = func(decoder, &rspObj->dataRsp);
47✔
1925
   if (code != 0) {
47!
1926
     SET_ERROR_MSG("decode mq taosx data rsp failed");
×
1927
  }
1928
   return code;
47✔
1929
}
1930

1931
static int32_t processCacheMeta(SHashObj* pVgHash, SHashObj* pNameHash, SHashObj* pMetaHash,
128✔
1932
                                SVCreateTbReq* pCreateReqDst, SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName,
1933
                                STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry) {
1934
  int32_t     code = 0;
128✔
1935
  STableMeta* pTableMeta = NULL;
128✔
1936
  tbInfo*     tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
128✔
1937
  if (tmpInfo == NULL || retry > 0) {
128!
1938
    tbInfo info = {0};
121✔
1939

1940
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, conn, pName, &info.vgInfo));
121!
1941
    if (pCreateReqDst && tmpInfo == NULL) {  // change stable name to get meta
121!
1942
      tstrncpy(pName->tname, pCreateReqDst->ctb.stbName, TSDB_TABLE_NAME_LEN);
24✔
1943
    }
1944
    RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
121!
1945
    info.uid = pTableMeta->uid;
121✔
1946
    if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
121✔
1947
      info.suid = pTableMeta->suid;
82✔
1948
    } else {
1949
      info.suid = pTableMeta->uid;
39✔
1950
    }
1951
    code = taosHashPut(pMetaHash, &info.suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
121✔
1952
    if (code != 0) {
121!
1953
      taosMemoryFree(pTableMeta);
×
1954
      goto end;
×
1955
    }
1956
    if (pCreateReqDst) {
121✔
1957
      pTableMeta->vgId = info.vgInfo.vgId;
24✔
1958
      pTableMeta->uid = pCreateReqDst->uid;
24✔
1959
      pCreateReqDst->ctb.suid = pTableMeta->suid;
24✔
1960
    }
1961

1962
    RAW_RETURN_CHECK(taosHashPut(pNameHash, pName->tname, strlen(pName->tname), &info, sizeof(tbInfo)));
121!
1963
    tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
121✔
1964
    RAW_RETURN_CHECK(
121!
1965
        taosHashPut(pVgHash, &info.vgInfo.vgId, sizeof(info.vgInfo.vgId), &info.vgInfo, sizeof(SVgroupInfo)));
1966
  }
1967

1968
  if (pTableMeta == NULL || retry > 0) {
128!
1969
    STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, &tmpInfo->suid, LONG_BYTES);
7✔
1970
    if (pTableMetaTmp == NULL || retry > 0 || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) {
7!
1971
      RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
3!
1972
      code = taosHashPut(pMetaHash, &tmpInfo->suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
3✔
1973
      if (code != 0) {
3!
1974
        taosMemoryFree(pTableMeta);
×
1975
        goto end;
×
1976
      }
1977

1978
    } else {
1979
      pTableMeta = *pTableMetaTmp;
4✔
1980
      pTableMeta->uid = tmpInfo->uid;
4✔
1981
      pTableMeta->vgId = tmpInfo->vgInfo.vgId;
4✔
1982
    }
1983
  }
1984
  *pMeta = pTableMeta;
128✔
1985

1986
end:
128✔
1987
  return code;
128✔
1988
}
1989

1990
static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
37✔
1991
  int32_t   code = TSDB_CODE_SUCCESS;
37✔
1992
  SQuery*   pQuery = NULL;
37✔
1993
  SMqRspObj rspObj = {0};
37✔
1994
  SDecoder  decoder = {0};
37✔
1995

1996
  SRequestObj*     pRequest = NULL;
37✔
1997
  SCatalog*        pCatalog = NULL;
37✔
1998
  SRequestConnInfo conn = {0};
37✔
1999
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
37!
2000
  uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
37!
2001
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
37!
2002

2003
  SHashObj* pVgHash = NULL;
37✔
2004
  SHashObj* pNameHash = NULL;
37✔
2005
  SHashObj* pMetaHash = NULL;
37✔
2006
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
37!
2007
  int retry = 0;
37✔
2008
  while (1) {
2009
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
37!
2010
    uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
37!
2011
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
138✔
2012
      if (!rspObj.dataRsp.withSchema) {
101!
2013
        goto end;
×
2014
      }
2015

2016
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
101✔
2017
      RAW_NULL_CHECK(tbName);
101!
2018
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
101✔
2019
      RAW_NULL_CHECK(pSW);
101!
2020
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
101✔
2021
      RAW_NULL_CHECK(pRetrieve);
101!
2022
      void* rawData = getRawDataFromRes(pRetrieve);
101✔
2023
      RAW_NULL_CHECK(rawData);
101!
2024

2025
      uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
101!
2026
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
101✔
2027
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
101✔
2028
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
101✔
2029

2030
      STableMeta* pTableMeta = NULL;
101✔
2031
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, &pTableMeta, pSW,
101!
2032
                                        rawData, retry));
2033
      char err[ERR_MSG_LEN] = {0};
101✔
2034
      code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
101✔
2035
      if (code != TSDB_CODE_SUCCESS) {
101!
2036
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
2037
        goto end;
×
2038
      }
2039
    }
2040
    RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
37!
2041
    launchQueryImpl(pRequest, pQuery, true, NULL);
37✔
2042
    code = pRequest->code;
37✔
2043

2044
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
37!
2045
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
2046
      qDestroyQuery(pQuery);
×
2047
      pQuery = NULL;
×
2048
      rspObj.resIter = -1;
×
2049
      continue;
×
2050
    }
2051
    break;
37✔
2052
  }
2053

2054
end:
37✔
2055
  uDebug(LOG_ID_TAG " write raw data return, msg:%s", LOG_ID_VALUE, tstrerror(code));
37!
2056
  tDeleteMqDataRsp(&rspObj.dataRsp);
37✔
2057
  tDecoderClear(&decoder);
37✔
2058
  qDestroyQuery(pQuery);
37✔
2059
  destroyRequest(pRequest);
37✔
2060
  return code;
37✔
2061
}
2062

2063
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) {
10✔
2064
  int32_t   code = TSDB_CODE_SUCCESS;
10✔
2065
  SQuery*   pQuery = NULL;
10✔
2066
  SMqRspObj rspObj = {0};
10✔
2067
  SDecoder  decoder = {0};
10✔
2068
  SHashObj* pCreateTbHash = NULL;
10✔
2069

2070
  SRequestObj*     pRequest = NULL;
10✔
2071
  SCatalog*        pCatalog = NULL;
10✔
2072
  SRequestConnInfo conn = {0};
10✔
2073

2074
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
10!
2075
  uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
10!
2076
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeSTaosxRsp, &rspObj));
10!
2077

2078
  pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
10✔
2079
  RAW_NULL_CHECK(pCreateTbHash);
10!
2080
  RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash));
10!
2081

2082
  SHashObj* pVgHash = NULL;
10✔
2083
  SHashObj* pNameHash = NULL;
10✔
2084
  SHashObj* pMetaHash = NULL;
10✔
2085
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
10!
2086
  int retry = 0;
10✔
2087
  while (1) {
2088
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
10!
2089
    uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
10!
2090
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
37✔
2091
      if (!rspObj.dataRsp.withSchema) {
27!
2092
        goto end;
×
2093
      }
2094

2095
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
27✔
2096
      RAW_NULL_CHECK(tbName);
27!
2097
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
27✔
2098
      RAW_NULL_CHECK(pSW);
27!
2099
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
27✔
2100
      RAW_NULL_CHECK(pRetrieve);
27!
2101
      void* rawData = getRawDataFromRes(pRetrieve);
27✔
2102
      RAW_NULL_CHECK(rawData);
27!
2103

2104
      uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
27!
2105
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
27✔
2106
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
27✔
2107
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
27✔
2108

2109
      // find schema data info
2110
      SVCreateTbReq* pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, pName.tname, strlen(pName.tname));
27✔
2111
      STableMeta*    pTableMeta = NULL;
27✔
2112
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, pCreateReqDst, pCatalog, &conn, &pName,
27!
2113
                                        &pTableMeta, pSW, rawData, retry));
2114
      char err[ERR_MSG_LEN] = {0};
27✔
2115
      code =
2116
          rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
27✔
2117
      if (code != TSDB_CODE_SUCCESS) {
27!
2118
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
2119
        goto end;
×
2120
      }
2121
    }
2122
    RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
10!
2123
    launchQueryImpl(pRequest, pQuery, true, NULL);
10✔
2124
    code = pRequest->code;
10✔
2125

2126
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
10!
2127
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
2128
      qDestroyQuery(pQuery);
×
2129
      pQuery = NULL;
×
2130
      rspObj.resIter = -1;
×
2131
      continue;
×
2132
    }
2133
    break;
10✔
2134
  }
2135

2136
end:
10✔
2137
  uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
10!
2138
  tDeleteSTaosxRsp(&rspObj.dataRsp);
10✔
2139
  void* pIter = taosHashIterate(pCreateTbHash, NULL);
10✔
2140
  while (pIter) {
32✔
2141
    tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE);
22✔
2142
    pIter = taosHashIterate(pCreateTbHash, pIter);
22✔
2143
  }
2144
  taosHashCleanup(pCreateTbHash);
10✔
2145
  tDecoderClear(&decoder);
10✔
2146
  qDestroyQuery(pQuery);
10✔
2147
  destroyRequest(pRequest);
10✔
2148
  return code;
10✔
2149
}
2150

2151
static void processSimpleMeta(SMqMetaRsp* pMetaRsp, cJSON** meta) {
289✔
2152
  if (pMetaRsp->resMsgType == TDMT_VND_CREATE_STB) {
289✔
2153
    processCreateStb(pMetaRsp, meta);
78✔
2154
  } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_STB) {
211✔
2155
    processAlterStb(pMetaRsp, meta);
40✔
2156
  } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_STB) {
171✔
2157
    processDropSTable(pMetaRsp, meta);
8✔
2158
  } else if (pMetaRsp->resMsgType == TDMT_VND_CREATE_TABLE) {
163✔
2159
    processCreateTable(pMetaRsp, meta);
131✔
2160
  } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_TABLE) {
32✔
2161
    processAlterTable(pMetaRsp, meta);
24✔
2162
  } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_TABLE) {
8✔
2163
    processDropTable(pMetaRsp, meta);
5✔
2164
  } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_TABLE) {
3!
2165
    processDropTable(pMetaRsp, meta);
×
2166
  } else if (pMetaRsp->resMsgType == TDMT_VND_DELETE) {
3!
2167
    processDeleteTable(pMetaRsp, meta);
3✔
2168
  }
2169
}
289✔
2170

2171
static void processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp, char** string) {
14✔
2172
  SDecoder        coder;
2173
  SMqBatchMetaRsp rsp = {0};
14✔
2174
  int32_t         code = 0;
14✔
2175
  cJSON*          pJson = NULL;
14✔
2176
  tDecoderInit(&coder, pMsgRsp->pMetaBuff, pMsgRsp->metaBuffLen);
14✔
2177
  if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) {
14!
2178
    goto end;
×
2179
  }
2180

2181
  pJson = cJSON_CreateObject();
14✔
2182
  RAW_NULL_CHECK(pJson);
14!
2183
  RAW_FALSE_CHECK(cJSON_AddStringToObject(pJson, "tmq_meta_version", TMQ_META_VERSION));
14!
2184
  cJSON* pMetaArr = cJSON_CreateArray();
14✔
2185
  RAW_NULL_CHECK(pMetaArr);
14!
2186
  int32_t num = taosArrayGetSize(rsp.batchMetaReq);
14✔
2187
  for (int32_t i = 0; i < num; i++) {
143✔
2188
    int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
129✔
2189
    RAW_NULL_CHECK(len);
129!
2190
    void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
129✔
2191
    RAW_NULL_CHECK(tmpBuf);
129!
2192
    SDecoder   metaCoder = {0};
129✔
2193
    SMqMetaRsp metaRsp = {0};
129✔
2194
    tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead));
129✔
2195
    if (tDecodeMqMetaRsp(&metaCoder, &metaRsp) < 0) {
129!
2196
      goto end;
×
2197
    }
2198
    cJSON* pItem = NULL;
129✔
2199
    processSimpleMeta(&metaRsp, &pItem);
129✔
2200
    tDeleteMqMetaRsp(&metaRsp);
129✔
2201
    RAW_FALSE_CHECK(cJSON_AddItemToArray(pMetaArr, pItem));
129!
2202
  }
2203

2204
  RAW_FALSE_CHECK(cJSON_AddItemToObject(pJson, "metas", pMetaArr));
14!
2205
  tDeleteMqBatchMetaRsp(&rsp);
14✔
2206
  char* fullStr = cJSON_PrintUnformatted(pJson);
14✔
2207
  cJSON_Delete(pJson);
14✔
2208
  *string = fullStr;
14✔
2209
  return;
14✔
2210

2211
end:
×
2212
  cJSON_Delete(pJson);
×
2213
  tDeleteMqBatchMetaRsp(&rsp);
×
2214
}
2215

2216
char* tmq_get_json_meta(TAOS_RES* res) {
184✔
2217
  if (res == NULL) return NULL;
184!
2218
  uDebug("tmq_get_json_meta res:%p", res);
184!
2219
  if (!TD_RES_TMQ_META(res) && !TD_RES_TMQ_METADATA(res) && !TD_RES_TMQ_BATCH_META(res)) {
184!
2220
    return NULL;
×
2221
  }
2222

2223
  char*      string = NULL;
184✔
2224
  SMqRspObj* rspObj = (SMqRspObj*)res;
184✔
2225
  if (TD_RES_TMQ_METADATA(res)) {
184✔
2226
    processAutoCreateTable(&rspObj->dataRsp, &string);
10✔
2227
  } else if (TD_RES_TMQ_BATCH_META(res)) {
174✔
2228
    processBatchMetaToJson(&rspObj->batchMetaRsp, &string);
14✔
2229
  } else if (TD_RES_TMQ_META(res)) {
160!
2230
    cJSON* pJson = NULL;
160✔
2231
    processSimpleMeta(&rspObj->metaRsp, &pJson);
160✔
2232
    string = cJSON_PrintUnformatted(pJson);
160✔
2233
    cJSON_Delete(pJson);
160✔
2234
  } else {
2235
    uError("tmq_get_json_meta res:%d, invalid type", *(int8_t*)res);
×
2236
  }
2237

2238
  uDebug("tmq_get_json_meta string:%s", string);
184!
2239
  return string;
184✔
2240
}
2241

2242
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
228!
2243

2244
static int32_t getOffSetLen(const SMqDataRsp* pRsp) {
47✔
2245
  SEncoder coder = {0};
47✔
2246
  tEncoderInit(&coder, NULL, 0);
47✔
2247
  if (tEncodeSTqOffsetVal(&coder, &pRsp->reqOffset) < 0) return -1;
47!
2248
  if (tEncodeSTqOffsetVal(&coder, &pRsp->rspOffset) < 0) return -1;
47!
2249
  int32_t pos = coder.pos;
47✔
2250
  tEncoderClear(&coder);
47✔
2251
  return pos;
47✔
2252
}
2253

2254
typedef int32_t __encode_func__(SEncoder* pEncoder, const SMqDataRsp* pRsp);
2255
static int32_t  encodeMqDataRsp(__encode_func__* encodeFunc, SMqDataRsp* rspObj, tmq_raw_data* raw) {
47✔
2256
   int32_t  len = 0;
47✔
2257
   int32_t  code = 0;
47✔
2258
   SEncoder encoder = {0};
47✔
2259
   void*    buf = NULL;
47✔
2260
   tEncodeSize(encodeFunc, rspObj, len, code);
47!
2261
   if (code < 0) {
47!
2262
     code = TSDB_CODE_INVALID_MSG;
×
2263
     goto FAILED;
×
2264
  }
2265
   len += sizeof(int8_t) + sizeof(int32_t);
47✔
2266
   buf = taosMemoryCalloc(1, len);
47✔
2267
   if (buf == NULL) {
47!
2268
     code = terrno;
×
2269
     goto FAILED;
×
2270
  }
2271
   tEncoderInit(&encoder, buf, len);
47✔
2272
   if (tEncodeI8(&encoder, MQ_DATA_RSP_VERSION) < 0) {
47!
2273
     code = TSDB_CODE_INVALID_MSG;
×
2274
     goto FAILED;
×
2275
  }
2276
   int32_t offsetLen = getOffSetLen(rspObj);
47✔
2277
   if (offsetLen <= 0) {
47!
2278
     code = TSDB_CODE_INVALID_MSG;
×
2279
     goto FAILED;
×
2280
  }
2281
   if (tEncodeI32(&encoder, offsetLen) < 0) {
47!
2282
     code = TSDB_CODE_INVALID_MSG;
×
2283
     goto FAILED;
×
2284
  }
2285
   if (encodeFunc(&encoder, rspObj) < 0) {
47!
2286
     code = TSDB_CODE_INVALID_MSG;
×
2287
     goto FAILED;
×
2288
  }
2289
   tEncoderClear(&encoder);
47✔
2290

2291
   raw->raw = buf;
47✔
2292
   raw->raw_len = len;
47✔
2293
   return code;
47✔
2294
FAILED:
×
2295
  tEncoderClear(&encoder);
×
2296
  taosMemoryFree(buf);
×
2297
  return code;
×
2298
}
2299

2300
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
246✔
2301
  if (!raw || !res) {
246!
2302
    return TSDB_CODE_INVALID_PARA;
×
2303
  }
2304
  SMqRspObj* rspObj = ((SMqRspObj*)res);
246✔
2305
  if (TD_RES_TMQ_META(res)) {
246✔
2306
    raw->raw = rspObj->metaRsp.metaRsp;
185✔
2307
    raw->raw_len = rspObj->metaRsp.metaRspLen;
185✔
2308
    raw->raw_type = rspObj->metaRsp.resMsgType;
185✔
2309
    uDebug("tmq get raw type meta:%p", raw);
185!
2310
  } else if (TD_RES_TMQ(res)) {
61✔
2311
    int32_t code = encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->dataRsp, raw);
37✔
2312
    if (code != 0) {
37!
2313
      uError("tmq get raw type error:%d", terrno);
×
2314
      return code;
×
2315
    }
2316
    raw->raw_type = RES_TYPE__TMQ;
37✔
2317
    uDebug("tmq get raw type data:%p", raw);
37!
2318
  } else if (TD_RES_TMQ_METADATA(res)) {
24✔
2319
    int32_t code = encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->dataRsp, raw);
10✔
2320
    if (code != 0) {
10!
2321
      uError("tmq get raw type error:%d", terrno);
×
2322
      return code;
×
2323
    }
2324
    raw->raw_type = RES_TYPE__TMQ_METADATA;
10✔
2325
    uDebug("tmq get raw type metadata:%p", raw);
10!
2326
  } else if (TD_RES_TMQ_BATCH_META(res)) {
14!
2327
    raw->raw = rspObj->batchMetaRsp.pMetaBuff;
14✔
2328
    raw->raw_len = rspObj->batchMetaRsp.metaBuffLen;
14✔
2329
    raw->raw_type = rspObj->resType;
14✔
2330
    uDebug("tmq get raw batch meta:%p", raw);
14!
2331
  } else {
2332
    uError("tmq get raw error type:%d", *(int8_t*)res);
×
2333
    return TSDB_CODE_TMQ_INVALID_MSG;
×
2334
  }
2335
  return TSDB_CODE_SUCCESS;
246✔
2336
}
2337

2338
void tmq_free_raw(tmq_raw_data raw) {
246✔
2339
  uDebug("tmq free raw data type:%d", raw.raw_type);
246!
2340
  if (raw.raw_type == RES_TYPE__TMQ || raw.raw_type == RES_TYPE__TMQ_METADATA) {
246✔
2341
    taosMemoryFree(raw.raw);
47✔
2342
  }
2343
  (void)memset(terrMsg, 0, ERR_MSG_LEN);
246✔
2344
}
246✔
2345

2346
static int32_t writeRawInit() {
350✔
2347
  while (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_START) {
365✔
2348
    int8_t old = atomic_val_compare_exchange_8(&initFlag, 0, 1);
15✔
2349
    if (old == 0) {
15!
2350
      int32_t code = initRawCacheHash();
15✔
2351
      if (code != 0) {
15!
2352
        uError("tmq writeRawImpl init error:%d", code);
×
2353
        atomic_store_8(&initedFlag, WRITE_RAW_INIT_FAIL);
×
2354
        return code;
×
2355
      }
2356
      atomic_store_8(&initedFlag, WRITE_RAW_INIT_OK);
15✔
2357
    }
2358
  }
2359

2360
  if (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_FAIL) {
350!
2361
    return TSDB_CODE_INTERNAL_ERROR;
×
2362
  }
2363
  return 0;
350✔
2364
}
2365

2366
static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) {
350✔
2367
  if (writeRawInit() != 0) {
350!
2368
    return TSDB_CODE_INTERNAL_ERROR;
×
2369
  }
2370

2371
  if (type == TDMT_VND_CREATE_STB) {
350✔
2372
    return taosCreateStb(taos, buf, len);
78✔
2373
  } else if (type == TDMT_VND_ALTER_STB) {
272✔
2374
    return taosCreateStb(taos, buf, len);
40✔
2375
  } else if (type == TDMT_VND_DROP_STB) {
232✔
2376
    return taosDropStb(taos, buf, len);
8✔
2377
  } else if (type == TDMT_VND_CREATE_TABLE) {
224✔
2378
    return taosCreateTable(taos, buf, len);
131✔
2379
  } else if (type == TDMT_VND_ALTER_TABLE) {
93✔
2380
    return taosAlterTable(taos, buf, len);
24✔
2381
  } else if (type == TDMT_VND_DROP_TABLE) {
69✔
2382
    return taosDropTable(taos, buf, len);
5✔
2383
  } else if (type == TDMT_VND_DELETE) {
64✔
2384
    return taosDeleteData(taos, buf, len);
3✔
2385
  } else if (type == RES_TYPE__TMQ_METADATA) {
61✔
2386
    return tmqWriteRawMetaDataImpl(taos, buf, len);
10✔
2387
  } else if (type == RES_TYPE__TMQ) {
51✔
2388
    return tmqWriteRawDataImpl(taos, buf, len);
37✔
2389
  } else if (type == RES_TYPE__TMQ_BATCH_META) {
14!
2390
    return tmqWriteBatchMetaDataImpl(taos, buf, len);
14✔
2391
  }
2392
  return TSDB_CODE_INVALID_PARA;
×
2393
}
2394

2395
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
234✔
2396
  if (taos == NULL || raw.raw == NULL || raw.raw_len <= 0) {
234!
2397
    SET_ERROR_MSG("taos:%p or data:%p is NULL or raw_len <= 0", taos, raw.raw);
13✔
2398
    return TSDB_CODE_INVALID_PARA;
13✔
2399
  }
2400

2401
  return writeRawImpl(taos, raw.raw, raw.raw_len, raw.raw_type);
221✔
2402
}
2403

2404
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen) {
14✔
2405
  if (taos == NULL || meta == NULL) {
14!
2406
    return TSDB_CODE_INVALID_PARA;
×
2407
  }
2408
  SMqBatchMetaRsp rsp = {0};
14✔
2409
  SDecoder        coder = {0};
14✔
2410
  int32_t         code = TSDB_CODE_SUCCESS;
14✔
2411

2412
  // decode and process req
2413
  tDecoderInit(&coder, meta, metaLen);
14✔
2414
  if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) {
14!
2415
    code = TSDB_CODE_INVALID_PARA;
×
2416
    goto end;
×
2417
  }
2418
  int32_t num = taosArrayGetSize(rsp.batchMetaReq);
14✔
2419
  for (int32_t i = 0; i < num; i++) {
143✔
2420
    int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
129✔
2421
    RAW_NULL_CHECK(len);
129!
2422
    void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
129✔
2423
    RAW_NULL_CHECK(tmpBuf);
129!
2424
    SDecoder   metaCoder = {0};
129✔
2425
    SMqMetaRsp metaRsp = {0};
129✔
2426
    tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead));
129✔
2427
    if (tDecodeMqMetaRsp(&metaCoder, &metaRsp) < 0) {
129!
2428
      code = TSDB_CODE_INVALID_PARA;
×
2429
      goto end;
×
2430
    }
2431
    code = writeRawImpl(taos, metaRsp.metaRsp, metaRsp.metaRspLen, metaRsp.resMsgType);
129✔
2432
    tDeleteMqMetaRsp(&metaRsp);
129✔
2433
    if (code != TSDB_CODE_SUCCESS) {
129!
2434
      goto end;
×
2435
    }
2436
  }
2437

2438
end:
14✔
2439
  tDeleteMqBatchMetaRsp(&rsp);
14✔
2440
  return code;
14✔
2441
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc