• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5013

03 Apr 2026 03:59PM UTC coverage: 72.317% (+0.01%) from 72.305%
#5013

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4053 of 5985 new or added lines in 68 files covered. (67.72%)

13131 existing lines in 160 files now uncovered.

257489 of 356056 relevant lines covered (72.32%)

129893134.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.59
/source/client/src/clientRawBlockWrite.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <string.h>
17
#include "cJSON.h"
18
#include "clientInt.h"
19
#include "nodes.h"
20
#include "osMemPool.h"
21
#include "osMemory.h"
22
#include "parser.h"
23
#include "taosdef.h"
24
#include "tarray.h"
25
#include "tbase64.h"
26
#include "tcol.h"
27
#include "tcompression.h"
28
#include "tdatablock.h"
29
#include "tdataformat.h"
30
#include "tdef.h"
31
#include "tglobal.h"
32
#include "tmsgtype.h"
33

34
#define RAW_LOG_END                                                           \
35
  if (code != 0) {                                                            \
36
    uError("%s failed at line:%d since:%s", __func__, lino, tstrerror(code)); \
37
  } else {                                                                    \
38
    uDebug("%s return success", __func__);                                    \
39
  }
40

41
#define RAW_LOG_START uDebug("%s start", __func__);
42

43
#define RAW_NULL_CHECK(c) \
44
  do {                    \
45
    if (c == NULL) {      \
46
      lino = __LINE__;    \
47
      code = terrno;      \
48
      goto end;           \
49
    }                     \
50
  } while (0)
51

52
#define RAW_FALSE_CHECK(c)           \
53
  do {                               \
54
    if (!(c)) {                      \
55
      code = TSDB_CODE_INVALID_PARA; \
56
      lino = __LINE__;               \
57
      goto end;                      \
58
    }                                \
59
  } while (0)
60

61
#define RAW_RETURN_CHECK(c) \
62
  do {                      \
63
    code = c;               \
64
    if (code != 0) {        \
65
      lino = __LINE__;      \
66
      goto end;             \
67
    }                       \
68
  } while (0)
69

70
#define LOG_ID_TAG   "connId:0x%" PRIx64 ", QID:0x%" PRIx64
71
#define LOG_ID_VALUE *(int64_t*)taos, pRequest->requestId
72

73
#define TMQ_META_VERSION "1.0"
74

75
static cJSON* tmqAddObjectToArray(cJSON* array) {
127,734✔
76
  cJSON* item = cJSON_CreateObject();
127,734✔
77
  if (cJSON_AddItemToArray(array, item)) {
127,734✔
78
    return item;
127,734✔
79
  }
80
  cJSON_Delete(item);
×
81
  return NULL;
×
82
}
83

84
static cJSON* tmqAddStringToArray(cJSON* array, const char* str) {
×
85
  cJSON* item = cJSON_CreateString(str);
×
86
  if (cJSON_AddItemToArray(array, item)) {
×
87
    return item;
×
88
  }
89
  cJSON_Delete(item);
×
90
  return NULL;
×
91
}
92
 
93
#define ADD_TO_JSON_STRING(JSON,NAME,VALUE) \
94
  RAW_NULL_CHECK(cJSON_AddStringToObject(JSON, NAME, VALUE));
95

96
#define ADD_TO_JSON_BOOL(JSON,NAME,VALUE) \
97
  RAW_NULL_CHECK(cJSON_AddBoolToObject(JSON, NAME, VALUE));
98

99
#define ADD_TO_JSON_NUMBER(JSON,NAME,VALUE) \
100
  RAW_NULL_CHECK(cJSON_AddNumberToObject(JSON, NAME, VALUE));
101

102
static int32_t  tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen);
103
static tb_uid_t processSuid(tb_uid_t suid, char* db) {
4,675✔
104
  if (db == NULL) {
4,675✔
105
    return suid;
×
106
  }
107
  return suid + MurmurHash3_32(db, strlen(db));
4,675✔
108
}
109

110
static int32_t getLength(int8_t type, int32_t bytes, int32_t typeMod) {
96,207✔
111
  int32_t length = 0;
96,207✔
112
  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_VARBINARY || type == TSDB_DATA_TYPE_GEOMETRY) {
96,207✔
113
    length = bytes - VARSTR_HEADER_SIZE;
13,447✔
114
  } else if (type == TSDB_DATA_TYPE_NCHAR) {
82,760✔
115
    length = (bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
7,097✔
116
  } else if (IS_STR_DATA_BLOB(type)) {
75,663✔
117
    length = bytes - BLOBSTR_HEADER_SIZE;
×
118
  } else if (type == TSDB_DATA_TYPE_DECIMAL || type == TSDB_DATA_TYPE_DECIMAL64) {
75,663✔
119
    length = typeMod;
365✔
120
  }
121
  return length;
96,207✔
122
}
123

124
static int32_t buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, SExtSchema* pExtSchemas, char* name, int64_t id, int8_t t,
10,013✔
125
                                 bool isVirtual, SColRefWrapper* colRef, SColCmprWrapper* pColCmprRow, cJSON** pJson) {
126
  if (schemaRow == NULL || name == NULL || pColCmprRow == NULL || pJson == NULL) {
10,013✔
127
    uError("invalid parameter, schemaRow:%p, name:%p, pColCmprRow:%p, pJson:%p", schemaRow, name, pColCmprRow, pJson);
×
128
    return TSDB_CODE_INVALID_PARA;
×
129
  }
130
  int32_t code = TSDB_CODE_SUCCESS;
10,013✔
131
  int32_t lino = 0;
10,013✔
132
  int8_t  buildDefaultCompress = 0;
10,013✔
133
  if (pColCmprRow->nCols <= 0) {
10,013✔
134
    buildDefaultCompress = 1;
2,170✔
135
  }
136
  RAW_LOG_START
10,013✔
137
  char*  string = NULL;
10,013✔
138
  cJSON* json = cJSON_CreateObject();
10,013✔
139
  RAW_NULL_CHECK(json);
10,013✔
140

141
  ADD_TO_JSON_STRING(json, "type", "create");
10,013✔
142
  ADD_TO_JSON_STRING(json, "tableType", (t == TSDB_SUPER_TABLE ? "super" : "normal"));
10,013✔
143
  if (isVirtual){
10,013✔
144
    ADD_TO_JSON_BOOL(json, "isVirtual", isVirtual);
3,977✔
145
  }
146
  ADD_TO_JSON_STRING(json, "tableName", name);
10,013✔
147

148
  cJSON* columns = cJSON_AddArrayToObject(json, "columns");
10,013✔
149
  RAW_NULL_CHECK(columns);
10,013✔
150

151
  for (int i = 0; i < schemaRow->nCols; i++) {
95,743✔
152
    cJSON* column = tmqAddObjectToArray(columns);
85,730✔
153
    RAW_NULL_CHECK(column);
85,730✔
154
    SSchema* s = schemaRow->pSchema + i;
85,730✔
155
    ADD_TO_JSON_STRING(column, "name", s->name);
85,730✔
156
    ADD_TO_JSON_NUMBER(column, "type", s->type);
85,730✔
157
    int32_t typeMod = 0;
85,730✔
158
    if (pExtSchemas != NULL) {
85,730✔
159
      typeMod = pExtSchemas[i].typeMod;
55,930✔
160
    }
161
    int32_t length = getLength(s->type, s->bytes, typeMod);
85,730✔
162
    if (length > 0) {
85,730✔
163
      ADD_TO_JSON_NUMBER(column, "length", length);
12,216✔
164
    }
165

166
    if (isVirtual && colRef != NULL && i < colRef->nCols){
85,730✔
167
      SColRef* pColRef = colRef->pColRef + i;
7,587✔
168
      if (pColRef->hasRef) {
7,587✔
169
        cJSON* ref = cJSON_AddObjectToObject(column, "ref");
4,699✔
170
        RAW_NULL_CHECK(ref);
4,699✔
171
        ADD_TO_JSON_STRING(ref, "refDbName", pColRef->refDbName);
4,699✔
172
        ADD_TO_JSON_STRING(ref, "refTableName", pColRef->refTableName);
4,699✔
173
        ADD_TO_JSON_STRING(ref, "refColName", pColRef->refColName);
4,699✔
174
      }
175
    }
176
    ADD_TO_JSON_BOOL(column, "isPrimarykey", (s->flags & COL_IS_KEY));
85,730✔
177
    if (pColCmprRow == NULL) {
85,730✔
178
      continue;
×
179
    }
180

181
    uint32_t alg = 0;
85,730✔
182
    if (buildDefaultCompress) {
85,730✔
183
      alg = createDefaultColCmprByType(s->type);
7,587✔
184
    } else {
185
      SColCmpr* pColCmpr = pColCmprRow->pColCmpr + i;
78,143✔
186
      alg = pColCmpr->alg;
78,143✔
187
    }
188
    const char* encode = columnEncodeStr(COMPRESS_L1_TYPE_U32(alg));
85,730✔
189
    RAW_NULL_CHECK(encode);
85,730✔
190
    const char* compress = columnCompressStr(COMPRESS_L2_TYPE_U32(alg));
85,730✔
191
    RAW_NULL_CHECK(compress);
85,730✔
192
    const char* level = columnLevelStr(COMPRESS_L2_TYPE_LEVEL_U32(alg));
85,730✔
193
    RAW_NULL_CHECK(level);
85,730✔
194

195
    ADD_TO_JSON_STRING(column, "encode", encode);
85,730✔
196
    ADD_TO_JSON_STRING(column, "compress", compress);
85,730✔
197
    ADD_TO_JSON_STRING(column, "level", level);
85,730✔
198
  }
199

200
  cJSON* tags = cJSON_AddArrayToObject(json, "tags");
10,013✔
201
  RAW_NULL_CHECK(tags);
10,013✔
202

203
  for (int i = 0; schemaTag && i < schemaTag->nCols; i++) {
17,854✔
204
    cJSON* tag = tmqAddObjectToArray(tags);
7,841✔
205
    RAW_NULL_CHECK(tag);
7,841✔
206
    SSchema* s = schemaTag->pSchema + i;
7,841✔
207
    ADD_TO_JSON_STRING(tag, "name", s->name);
7,841✔
208
    ADD_TO_JSON_NUMBER(tag, "type", s->type);
7,841✔
209
    int32_t length = getLength(s->type, s->bytes, 0);
7,841✔
210
    if (length > 0) {
7,841✔
211
      ADD_TO_JSON_NUMBER(tag, "length", length);
6,057✔
212
    }
213
  }
214

215
end:
10,013✔
216
  *pJson = json;
10,013✔
217
  RAW_LOG_END
10,013✔
218
  return code;
10,013✔
219
}
220

221
static int32_t setCompressOption(cJSON* json, uint32_t para) {
×
222
  if (json == NULL) {
×
223
    return TSDB_CODE_INVALID_PARA;
×
224
  }
225
  uint8_t encode = COMPRESS_L1_TYPE_U32(para);
×
226
  int32_t code = 0;
×
227
  int32_t lino = 0;
×
228
  RAW_LOG_START
×
229
  if (encode != 0) {
×
230
    const char* encodeStr = columnEncodeStr(encode);
×
231
    RAW_NULL_CHECK(encodeStr);
×
232
    ADD_TO_JSON_STRING(json, "encode", encodeStr);
×
233
    goto end;
×
234
  }
235
  uint8_t compress = COMPRESS_L2_TYPE_U32(para);
×
236
  if (compress != 0) {
×
237
    const char* compressStr = columnCompressStr(compress);
×
238
    RAW_NULL_CHECK(compressStr);
×
239
    ADD_TO_JSON_STRING(json, "compress", compressStr);
×
240
    goto end;
×
241
  }
242
  uint8_t level = COMPRESS_L2_TYPE_LEVEL_U32(para);
×
243
  if (level != 0) {
×
244
    const char* levelStr = columnLevelStr(level);
×
245
    RAW_NULL_CHECK(levelStr);
×
246
    ADD_TO_JSON_STRING(json, "level", levelStr);
×
247
    goto end;
×
248
  }
249

250
end:
×
251
  RAW_LOG_END
×
252
  return code;
×
253
}
254
static int32_t buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON** pJson) {
1,902✔
255
  if (alterData == NULL || pJson == NULL) {
1,902✔
256
    uError("invalid parameter in %s alterData:%p", __func__, alterData);
×
257
    return TSDB_CODE_INVALID_PARA;
×
258
  }
259
  SMAlterStbReq req = {0};
1,902✔
260
  cJSON*        json = NULL;
1,902✔
261
  char*         string = NULL;
1,902✔
262
  int32_t       code = 0;
1,902✔
263
  int32_t       lino = 0;
1,902✔
264
  RAW_LOG_START
1,902✔
265
  RAW_RETURN_CHECK(tDeserializeSMAlterStbReq(alterData, alterDataLen, &req));
1,902✔
266
  json = cJSON_CreateObject();
1,902✔
267
  RAW_NULL_CHECK(json);
1,902✔
268
  ADD_TO_JSON_STRING(json, "type", "alter");
1,902✔
269
  SName name = {0};
1,902✔
270
  RAW_RETURN_CHECK(tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
1,902✔
271
  ADD_TO_JSON_STRING(json, "tableType", "super");
1,902✔
272
  ADD_TO_JSON_STRING(json, "tableName", name.tname);
1,902✔
273
  ADD_TO_JSON_NUMBER(json, "alterType", req.alterType);
1,902✔
274

275
  switch (req.alterType) {
1,902✔
276
    case TSDB_ALTER_TABLE_ADD_TAG:
951✔
277
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
278
      if (taosArrayGetSize(req.pFields) != 1) {
951✔
279
        uError("invalid field num %" PRIzu " for alter type %d", taosArrayGetSize(req.pFields), req.alterType);
×
280
        cJSON_Delete(json);
×
281
        json = NULL;
×
282
        code = TSDB_CODE_INVALID_PARA;
×
283
        goto end;
×
284
      }
285
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
951✔
286
      RAW_NULL_CHECK(field);
951✔
287
      ADD_TO_JSON_STRING(json, "colName", field->name);
951✔
288
      ADD_TO_JSON_NUMBER(json, "colType", field->type);
951✔
289
      int32_t typeMode = 0;
951✔
290
      if (taosArrayGetSize(req.pTypeMods) > 0) {
951✔
291
        typeMode = *(STypeMod*)taosArrayGet(req.pTypeMods, 0);
×
292
      }
293
      int32_t length = getLength(field->type, field->bytes, typeMode);
951✔
294
      if (length > 0) {
951✔
295
        ADD_TO_JSON_NUMBER(json, "colLength", length);
951✔
296
      }
297

298
      break;
951✔
299
    }
300
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: {
×
301
      SFieldWithOptions* field = taosArrayGet(req.pFields, 0);
×
302
      RAW_NULL_CHECK(field);
×
303
      ADD_TO_JSON_STRING(json, "colName", field->name);
×
304
      ADD_TO_JSON_NUMBER(json, "colType", field->type);
×
305
      int32_t typeMode = 0;
×
306
      if (taosArrayGetSize(req.pTypeMods) > 0) {
×
307
        typeMode = *(STypeMod*)taosArrayGet(req.pTypeMods, 0);
×
308
      }
309
      int32_t length = getLength(field->type, field->bytes, typeMode);
×
310
      if (length > 0) {
×
311
        ADD_TO_JSON_NUMBER(json, "colLength", length);
×
312
      }
313

314
      RAW_RETURN_CHECK(setCompressOption(json, field->compress));
×
315
      break;
×
316
    }
317
    case TSDB_ALTER_TABLE_DROP_TAG:
×
318
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
319
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
×
320
      RAW_NULL_CHECK(field);
×
321
      ADD_TO_JSON_STRING(json, "colName", field->name);
×
322
      break;
×
323
    }
324
    case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
951✔
325
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
326
      if (taosArrayGetSize(req.pFields) != 1) {
951✔
327
        uError("invalid field num %" PRIzu " for alter type %d", taosArrayGetSize(req.pFields), req.alterType);
×
328
        cJSON_Delete(json);
×
329
        json = NULL;
×
330
        code = TSDB_CODE_INVALID_PARA;
×
331
        goto end;
×
332
      }
333
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
951✔
334
      RAW_NULL_CHECK(field);
951✔
335
      ADD_TO_JSON_STRING(json, "colName", field->name);
951✔
336
      ADD_TO_JSON_NUMBER(json, "colType", field->type);
951✔
337
      int32_t length = getLength(field->type, field->bytes, 0);
951✔
338
      if (length > 0) {
951✔
339
        ADD_TO_JSON_NUMBER(json, "colLength", length);
951✔
340
      }
341

342
      break;
951✔
343
    }
344
    case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
×
345
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
346
      TAOS_FIELD* oldField = taosArrayGet(req.pFields, 0);
×
347
      RAW_NULL_CHECK(oldField);
×
348
      TAOS_FIELD* newField = taosArrayGet(req.pFields, 1);
×
349
      RAW_NULL_CHECK(newField);
×
350
      ADD_TO_JSON_STRING(json, "colName", oldField->name);
×
351
      ADD_TO_JSON_STRING(json, "colNewName", newField->name);
×
352
      break;
×
353
    }
354
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
×
355
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
×
356
      RAW_NULL_CHECK(field);
×
357
      ADD_TO_JSON_STRING(json, "colName", field->name);
×
358
      RAW_RETURN_CHECK(setCompressOption(json, field->bytes));
×
359
      break;
×
360
    }
361
    default:
×
362
      break;
×
363
  }
364

365
end:
1,902✔
366
  tFreeSMAltertbReq(&req);
1,902✔
367
  *pJson = json;
1,902✔
368
  RAW_LOG_END
1,902✔
369
  return code;
1,902✔
370
}
371

372
static int32_t processCreateStb(SMqMetaRsp* metaRsp, cJSON** pJson) {
6,039✔
373
  if (metaRsp == NULL || pJson == NULL) {
6,039✔
374
    uError("invalid parameter in %s", __func__);
×
375
    return TSDB_CODE_INVALID_PARA;
×
376
  }
377
  int32_t        code = TSDB_CODE_SUCCESS;
6,039✔
378
  int32_t        lino = 0;
6,039✔
379
  SVCreateStbReq req = {0};
6,039✔
380
  SDecoder       coder = {0};
6,039✔
381

382
  RAW_LOG_START
6,039✔
383
  // decode and process req
384
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
6,039✔
385
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
6,039✔
386
  tDecoderInit(&coder, data, len);
6,039✔
387

388
  RAW_RETURN_CHECK(tDecodeSVCreateStbReq(&coder, &req));
6,039✔
389
  RAW_RETURN_CHECK(buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.pExtSchemas, req.name, req.suid,
6,039✔
390
                                        TSDB_SUPER_TABLE, req.virtualStb, NULL, &req.colCmpr, pJson));
391

392
end:
6,039✔
393
  tDecoderClear(&coder);
6,039✔
394
  RAW_LOG_END
6,039✔
395
  return code;
6,039✔
396
}
397

398
static int32_t processAlterStb(SMqMetaRsp* metaRsp, cJSON** pJson) {
1,902✔
399
  if (metaRsp == NULL || pJson == NULL) {
1,902✔
400
    uError("invalid parameter in %s", __func__);
×
401
    return TSDB_CODE_INVALID_PARA;
×
402
  }
403
  SVCreateStbReq req = {0};
1,902✔
404
  SDecoder       coder = {0};
1,902✔
405
  int32_t        code = TSDB_CODE_SUCCESS;
1,902✔
406
  int32_t        lino = 0;
1,902✔
407
  RAW_LOG_START
1,902✔
408

409
  // decode and process req
410
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
1,902✔
411
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
1,902✔
412
  tDecoderInit(&coder, data, len);
1,902✔
413

414
  RAW_RETURN_CHECK(tDecodeSVCreateStbReq(&coder, &req));
1,902✔
415
  RAW_RETURN_CHECK(buildAlterSTableJson(req.alterOriData, req.alterOriDataLen, pJson));
1,902✔
416

417
end:
1,902✔
418
  tDecoderClear(&coder);
1,902✔
419
  RAW_LOG_END
1,902✔
420
  return code;
1,902✔
421
}
422

423
static int32_t buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
12,821✔
424
  if (json == NULL || pCreateReq == NULL) {
12,821✔
425
    uError("invalid parameter in %s", __func__);
×
426
    return TSDB_CODE_INVALID_PARA;
×
427
  }
428
  STag*   pTag = (STag*)pCreateReq->ctb.pTag;
12,821✔
429
  char*   sname = pCreateReq->ctb.stbName;
12,821✔
430
  char*   name = pCreateReq->name;
12,821✔
431
  SArray* tagName = pCreateReq->ctb.tagName;
12,821✔
432
  int64_t id = pCreateReq->uid;
12,821✔
433
  uint8_t tagNum = pCreateReq->ctb.tagNum;
12,821✔
434
  int32_t code = 0;
12,821✔
435
  int32_t lino = 0;
12,821✔
436
  SArray* pTagVals = NULL;
12,821✔
437
  char*   pJson = NULL;
12,821✔
438
  char*   buf = NULL;
12,821✔
439
  RAW_LOG_START
12,821✔
440

441
  ADD_TO_JSON_STRING(json, "tableName", name);
12,821✔
442

443
  if (pCreateReq->type == TSDB_VIRTUAL_CHILD_TABLE) {
12,821✔
444
    cJSON* refs = cJSON_AddArrayToObject(json, "refs");
3,614✔
445
    RAW_NULL_CHECK(refs);
3,614✔
446

447
    for (int i = 0; i < pCreateReq->colRef.nCols; i++) {
18,070✔
448
      SColRef* pColRef = pCreateReq->colRef.pColRef + i;
14,456✔
449
  
450
      if (!pColRef->hasRef) {
14,456✔
451
        continue;
7,228✔
452
      }
453
      cJSON* ref = tmqAddObjectToArray(refs);
7,228✔
454
      RAW_NULL_CHECK(ref);
7,228✔
455
      ADD_TO_JSON_STRING(ref, "colName", pColRef->colName);
7,228✔
456
      ADD_TO_JSON_STRING(ref, "refDbName", pColRef->refDbName);
7,228✔
457
      ADD_TO_JSON_STRING(ref, "refTableName", pColRef->refTableName);
7,228✔
458
      ADD_TO_JSON_STRING(ref, "refColName", pColRef->refColName);
7,228✔
459
    }
460
  }
461

462
  ADD_TO_JSON_STRING(json, "using", sname);
12,821✔
463
  ADD_TO_JSON_NUMBER(json, "tagNum", tagNum);
12,821✔
464

465
  cJSON* tags = cJSON_AddArrayToObject(json, "tags");
12,821✔
466
  RAW_NULL_CHECK(tags);
12,821✔
467
  RAW_RETURN_CHECK(tTagToValArray(pTag, &pTagVals));
12,821✔
468
  if (tTagIsJson(pTag)) {
12,821✔
469
    STag* p = (STag*)pTag;
×
470
    if (p->nTag == 0) {
×
471
      uWarn("p->nTag == 0");
×
472
      goto end;
×
473
    }
474
    parseTagDatatoJson(pTag, &pJson, NULL);
×
475
    RAW_NULL_CHECK(pJson);
×
476
    cJSON* tag = tmqAddObjectToArray(tags);
×
477
    RAW_NULL_CHECK(tag);
×
478
    STagVal* pTagVal = taosArrayGet(pTagVals, 0);
×
479
    RAW_NULL_CHECK(pTagVal);
×
480
    char* ptname = taosArrayGet(tagName, 0);
×
481
    RAW_NULL_CHECK(ptname);
×
482
    ADD_TO_JSON_STRING(tag, "name", ptname);
×
483
    ADD_TO_JSON_NUMBER(tag, "type", TSDB_DATA_TYPE_JSON);
×
484
    ADD_TO_JSON_STRING(tag, "value", pJson);
×
485
    goto end;
×
486
  }
487

488
  for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
29,935✔
489
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
17,114✔
490
    RAW_NULL_CHECK(pTagVal);
17,114✔
491
    cJSON* tag = tmqAddObjectToArray(tags);
17,114✔
492
    RAW_NULL_CHECK(tag);
17,114✔
493
    char* ptname = taosArrayGet(tagName, i);
17,114✔
494
    RAW_NULL_CHECK(ptname);
17,114✔
495
    ADD_TO_JSON_STRING(tag, "name", ptname);
17,114✔
496
    ADD_TO_JSON_NUMBER(tag, "type", pTagVal->type);
17,114✔
497

498
    if (IS_VAR_DATA_TYPE(pTagVal->type)) {
28,851✔
499
      if (IS_STR_DATA_BLOB(pTagVal->type)) {
11,737✔
500
        goto end;
×
501
      }
502
      int64_t bufSize = 0;
11,737✔
503
      if (pTagVal->type == TSDB_DATA_TYPE_VARBINARY) {
11,737✔
504
        bufSize = pTagVal->nData * 2 + 2 + 3;
×
505
      } else {
506
        bufSize = pTagVal->nData + 3;
11,737✔
507
      }
508
      buf = taosMemoryCalloc(bufSize, 1);
11,737✔
509
      RAW_NULL_CHECK(buf);
11,737✔
510
      code = dataConverToStr(buf, bufSize, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL);
11,737✔
511
      if (code != TSDB_CODE_SUCCESS) {
11,737✔
512
        uError("convert tag value to string failed");
×
513
        goto end;
×
514
      }
515

516
      ADD_TO_JSON_STRING(tag, "value", buf)
11,737✔
517
      taosMemoryFreeClear(buf);
11,737✔
518
    } else {
519
      double val = 0;
5,377✔
520
      GET_TYPED_DATA(val, double, pTagVal->type, &pTagVal->i64, 0);  // currently tag type can't be decimal, so pass 0 as typeMod
5,377✔
521
      ADD_TO_JSON_NUMBER(tag, "value", val)
5,377✔
522
    }
523
  }
524

525
end:
12,821✔
526
  taosMemoryFree(pJson);
12,821✔
527
  taosArrayDestroy(pTagVals);
12,821✔
528
  taosMemoryFree(buf);
12,821✔
529
  RAW_LOG_END
12,821✔
530
  return code;
12,821✔
531
}
532

533
static int32_t buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs, cJSON** pJson) {
12,087✔
534
  if (pJson == NULL || pCreateReq == NULL) {
12,087✔
535
    uError("invalid parameter in %s", __func__);
×
536
    return TSDB_CODE_INVALID_PARA;
×
537
  }
538
  int32_t code = 0;
12,087✔
539
  int32_t lino = 0;
12,087✔
540
  RAW_LOG_START
12,087✔
541
  char*  string = NULL;
12,087✔
542
  cJSON* json = cJSON_CreateObject();
12,087✔
543
  RAW_NULL_CHECK(json);
12,087✔
544
  ADD_TO_JSON_STRING(json, "type", "create");
12,087✔
545
  ADD_TO_JSON_STRING(json, "tableType", "child");
12,087✔
546

547
  RAW_RETURN_CHECK(buildChildElement(json, pCreateReq));
12,087✔
548
  cJSON* createList = cJSON_AddArrayToObject(json, "createList");
12,087✔
549
  RAW_NULL_CHECK(createList);
12,087✔
550

551
  for (int i = 0; nReqs > 1 && i < nReqs; i++) {
12,821✔
552
    cJSON* create = tmqAddObjectToArray(createList);
734✔
553
    RAW_NULL_CHECK(create);
734✔
554
    RAW_RETURN_CHECK(buildChildElement(create, pCreateReq + i));
734✔
555
  }
556

557
end:
12,087✔
558
  *pJson = json;
12,087✔
559
  RAW_LOG_END
12,087✔
560
  return code;
12,087✔
561
}
562

563
static int32_t processCreateTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
15,012✔
564
  if (pJson == NULL || metaRsp == NULL) {
15,012✔
565
    uError("invalid parameter in %s", __func__);
×
566
    return TSDB_CODE_INVALID_PARA;
×
567
  }
568
  int32_t            code = TSDB_CODE_SUCCESS;
15,012✔
569
  int32_t            lino = 0;
15,012✔
570
  SDecoder           decoder = {0};
15,012✔
571
  SVCreateTbBatchReq req = {0};
15,012✔
572
  SVCreateTbReq*     pCreateReq;
573
  RAW_LOG_START
15,012✔
574
  // decode
575
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
15,012✔
576
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
15,012✔
577
  tDecoderInit(&decoder, data, len);
15,012✔
578
  RAW_RETURN_CHECK(tDecodeSVCreateTbBatchReq(&decoder, &req));
15,012✔
579
  // loop to create table
580
  if (req.nReqs > 0) {
15,012✔
581
    pCreateReq = req.pReqs;
15,012✔
582
    if (pCreateReq->type == TSDB_CHILD_TABLE || pCreateReq->type == TSDB_VIRTUAL_CHILD_TABLE) {
15,012✔
583
      RAW_RETURN_CHECK(buildCreateCTableJson(req.pReqs, req.nReqs, pJson));
11,038✔
584
    } else if (pCreateReq->type == TSDB_NORMAL_TABLE || pCreateReq->type == TSDB_VIRTUAL_NORMAL_TABLE) {
3,974✔
585
      RAW_RETURN_CHECK(buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->pExtSchemas, pCreateReq->name,
3,974✔
586
                                            pCreateReq->uid, pCreateReq->type, pCreateReq->type == TSDB_NORMAL_TABLE ? false : true, 
587
                                            &pCreateReq->colRef, &pCreateReq->colCmpr, pJson));
588
    }
589
  }
590

591
end:
15,012✔
592
  tDeleteSVCreateTbBatchReq(&req);
15,012✔
593
  tDecoderClear(&decoder);
15,012✔
594
  RAW_LOG_END
15,012✔
595
  return code;
15,012✔
596
}
597

598
static int32_t processAutoCreateTable(SMqDataRsp* rsp, char** string) {
1,049✔
599
  int32_t lino = 0;
1,049✔
600
  int32_t code = TSDB_CODE_SUCCESS;
1,049✔
601
  RAW_LOG_START
1,049✔
602
  RAW_FALSE_CHECK(rsp != NULL && string != NULL);
1,049✔
603
  SDecoder*      decoder = NULL;
1,049✔
604
  SVCreateTbReq* pCreateReq = NULL;
1,049✔
605
  RAW_FALSE_CHECK(rsp->createTableNum > 0);
1,049✔
606

607
  decoder = taosMemoryCalloc(rsp->createTableNum, sizeof(SDecoder));
1,049✔
608
  RAW_NULL_CHECK(decoder);
1,049✔
609
  pCreateReq = taosMemoryCalloc(rsp->createTableNum, sizeof(SVCreateTbReq));
1,049✔
610
  RAW_NULL_CHECK(pCreateReq);
1,049✔
611

612
  // loop to create table
613
  for (int32_t iReq = 0; iReq < rsp->createTableNum; iReq++) {
2,465✔
614
    // decode
615
    void** data = taosArrayGet(rsp->createTableReq, iReq);
1,416✔
616
    RAW_NULL_CHECK(data);
1,416✔
617
    int32_t* len = taosArrayGet(rsp->createTableLen, iReq);
1,416✔
618
    RAW_NULL_CHECK(len);
1,416✔
619
    tDecoderInit(&decoder[iReq], *data, *len);
1,416✔
620
    RAW_RETURN_CHECK(tDecodeSVCreateTbReq(&decoder[iReq], pCreateReq + iReq));
1,416✔
621

622
    if (pCreateReq[iReq].type != TSDB_CHILD_TABLE && pCreateReq[iReq].type != TSDB_NORMAL_TABLE) {
1,416✔
623
      uError("%s failed. pCreateReq[iReq].type:%d invalid", __func__, pCreateReq[iReq].type);
×
624
      code = TSDB_CODE_INVALID_PARA;
×
625
      goto end;
×
626
    }
627
  }
628
  cJSON* pJson = NULL;
1,049✔
629
  if (pCreateReq->type == TSDB_NORMAL_TABLE) {
1,049✔
UNCOV
630
    RAW_RETURN_CHECK(buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->pExtSchemas, pCreateReq->name,
×
631
                                          pCreateReq->uid, TSDB_NORMAL_TABLE, false, NULL, &pCreateReq->colCmpr, &pJson));
632
  } else if (pCreateReq->type == TSDB_CHILD_TABLE) {
1,049✔
633
    RAW_RETURN_CHECK(buildCreateCTableJson(pCreateReq, rsp->createTableNum, &pJson));
1,049✔
634
  }
635

636
  *string = cJSON_PrintUnformatted(pJson);
1,049✔
637
  cJSON_Delete(pJson);
1,049✔
638

639
  uDebug("auto created table return, sql json:%s", *string);
1,049✔
640

641
end:
1,049✔
642
  RAW_LOG_END
1,049✔
643
  for (int i = 0; decoder && pCreateReq && i < rsp->createTableNum; i++) {
2,465✔
644
    tDecoderClear(&decoder[i]);
1,416✔
645
    taosMemoryFreeClear(pCreateReq[i].comment);
1,416✔
646
    if (pCreateReq[i].type == TSDB_CHILD_TABLE) {
1,416✔
647
      taosArrayDestroy(pCreateReq[i].ctb.tagName);
1,416✔
648
    }
649
  }
650
  taosMemoryFree(decoder);
1,049✔
651
  taosMemoryFree(pCreateReq);
1,049✔
652
  return code;
1,049✔
653
}
654

655
static int32_t processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
6,560✔
656
  if (pJson == NULL || metaRsp == NULL) {
6,560✔
657
    uError("invalid parameter in %s", __func__);
×
658
    return TSDB_CODE_INVALID_PARA;
×
659
  }
660
  SDecoder     decoder = {0};
6,560✔
661
  SVAlterTbReq vAlterTbReq = {0};
6,560✔
662
  char*        string = NULL;
6,560✔
663
  cJSON*       json = NULL;
6,560✔
664
  int32_t      code = 0;
6,560✔
665
  int32_t      lino = 0;
6,560✔
666
  char*        buf = NULL;
6,560✔
667
  char*        buf1 = NULL;
6,560✔
668
  char*        buf2 = NULL;
6,560✔
669
  SNode*       pWhere = NULL;
6,560✔
670

671
  RAW_LOG_START
6,560✔
672

673
  // decode
674
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
6,560✔
675
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
6,560✔
676
  tDecoderInit(&decoder, data, len);
6,560✔
677
  RAW_RETURN_CHECK(tDecodeSVAlterTbReq(&decoder, &vAlterTbReq));
6,560✔
678

679
  json = cJSON_CreateObject();
6,560✔
680
  RAW_NULL_CHECK(json);
6,560✔
681
  ADD_TO_JSON_STRING(json, "type", "alter");
6,560✔
682

683
  char* tableType = NULL;
6,560✔
684
  if (vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TABLE_TAG_VAL){
6,560✔
685
    tableType = "child";
2,892✔
686
  } else if (vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_CHILD_TABLE_TAG_VAL){
3,668✔
687
    tableType = "super";
734✔
688
  } else if (vAlterTbReq.action == TSDB_ALTER_TABLE_ALTER_COLUMN_REF ||
2,934✔
689
             vAlterTbReq.action == TSDB_ALTER_TABLE_REMOVE_COLUMN_REF) {
1,834✔
690
    tableType = "";
2,200✔
691
  } else {
692
    tableType = "normal";
734✔
693
  }
694

695
  ADD_TO_JSON_STRING(json, "tableType", tableType);
6,560✔
696
  ADD_TO_JSON_STRING(json, "tableName", vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TABLE_TAG_VAL ? "" : vAlterTbReq.tbName);
6,560✔
697
  ADD_TO_JSON_NUMBER(json, "alterType", vAlterTbReq.action);
6,560✔
698

699
  uDebug("alter table action:%d", vAlterTbReq.action);
6,560✔
700
  switch (vAlterTbReq.action) {
6,560✔
701
    case TSDB_ALTER_TABLE_ADD_COLUMN: 
734✔
702
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COLUMN_REF: {
703
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
734✔
704
      ADD_TO_JSON_NUMBER(json, "colType", vAlterTbReq.type);
734✔
705

706
      int32_t length = getLength(vAlterTbReq.type, vAlterTbReq.bytes, vAlterTbReq.typeMod);
734✔
707
      if (length > 0) {
734✔
708
        ADD_TO_JSON_NUMBER(json, "colLength", length);
734✔
709
      }
710

711
      if (vAlterTbReq.action == TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COLUMN_REF) {
734✔
712
        ADD_TO_JSON_STRING(json, "refDbName", vAlterTbReq.refDbName);
734✔
713
        ADD_TO_JSON_STRING(json, "refTbName", vAlterTbReq.refTbName);
734✔
714
        ADD_TO_JSON_STRING(json, "refColName", vAlterTbReq.refColName);
734✔
715
      }
716
      break;
734✔
717
    }
718
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: {
×
719
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
×
720
      ADD_TO_JSON_NUMBER(json, "colType", vAlterTbReq.type);
×
721

722
      int32_t length = getLength(vAlterTbReq.type, vAlterTbReq.bytes, vAlterTbReq.typeMod);
×
723
      if (length > 0) {
×
724
        ADD_TO_JSON_NUMBER(json, "colLength", length);
×
725
      }
726
      RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
×
727
      break;
×
728
    }
729
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
×
730
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
×
731
      break;
×
732
    }
733
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
×
734
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
×
735
      ADD_TO_JSON_NUMBER(json, "colType", vAlterTbReq.colModType);
×
736
      int32_t length = getLength(vAlterTbReq.colModType, vAlterTbReq.colModBytes, vAlterTbReq.typeMod);
×
737
      if (length > 0) {
×
738
        ADD_TO_JSON_NUMBER(json, "colLength", length);
×
739
      }
740
      break;
×
741
    }
742
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
×
743
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
×
744
      ADD_TO_JSON_STRING(json, "colNewName", vAlterTbReq.colNewName);
×
745
      break;
×
746
    }
747
    case TSDB_ALTER_TABLE_UPDATE_MULTI_TABLE_TAG_VAL: {
2,892✔
748
      int32_t nTables = taosArrayGetSize(vAlterTbReq.tables);
2,892✔
749
      if (nTables <= 0) {
2,892✔
750
        code = TSDB_CODE_INVALID_PARA;
×
751
        uError("processAlterTable parse multi tables error");
×
752
        goto end;
×
753
      }
754

755
      cJSON* tables = cJSON_AddArrayToObject(json, "tables");
2,892✔
756
      RAW_NULL_CHECK(tables);
2,892✔
757

758
      for (int32_t i = 0; i < nTables; i++) {
6,518✔
759
        SUpdateTableTagVal* pTable = taosArrayGet(vAlterTbReq.tables, i);
3,626✔
760
        cJSON* tableObj = tmqAddObjectToArray(tables);
3,626✔
761
        RAW_NULL_CHECK(tableObj);
3,626✔
762

763
        ADD_TO_JSON_STRING(tableObj, "tableName", pTable->tbName);
3,626✔
764

765
        int32_t nTags = taosArrayGetSize(pTable->tags);
3,626✔
766
        cJSON* tags = cJSON_AddArrayToObject(tableObj, "tags");
3,626✔
767
        RAW_NULL_CHECK(tags);
3,626✔
768

769
        for (int32_t j = 0; j < nTags; j++) {
8,353✔
770
          cJSON* member = tmqAddObjectToArray(tags);
4,727✔
771
          RAW_NULL_CHECK(member);
4,727✔
772

773
          SUpdatedTagVal* pTagVal = taosArrayGet(pTable->tags, j);
4,727✔
774
          ADD_TO_JSON_STRING(member, "colName", pTagVal->tagName);
4,727✔
775

776
          if (pTagVal->regexp != NULL) {
4,727✔
777
            ADD_TO_JSON_STRING(member, "regexp", pTagVal->regexp);
×
778
            ADD_TO_JSON_STRING(member, "replacement", pTagVal->replacement);
×
779
          } else {
780
            bool isNull = pTagVal->isNull;
4,727✔
781
            if (!isNull) {
4,727✔
782
              int64_t bufSize = 0;
4,727✔
783
              if (pTagVal->tagType == TSDB_DATA_TYPE_VARBINARY) {
4,727✔
784
                bufSize = pTagVal->nTagVal * 2 + 2 + 3;
×
785
              } else {
786
                bufSize = pTagVal->nTagVal + 3;
4,727✔
787
              }
788
              buf1 = taosMemoryCalloc(bufSize, 1);
4,727✔
789
              RAW_NULL_CHECK(buf1);
4,727✔
790
              code = dataConverToStr(buf1, bufSize, pTagVal->tagType, pTagVal->pTagVal, pTagVal->nTagVal, NULL);
4,727✔
791
              if (code != TSDB_CODE_SUCCESS) {
4,727✔
792
                uError("convert tag value to string failed");
×
793
                goto end;
×
794
              }
795
              ADD_TO_JSON_STRING(member, "colValue", buf1);
4,727✔
796
              taosMemoryFreeClear(buf1);
4,727✔
797
            }
798
            ADD_TO_JSON_BOOL(member, "colValueNull", isNull);
4,727✔
799
          }
800
        }
801
      }
802
      break;
2,892✔
803
    }
804

805
    case TSDB_ALTER_TABLE_UPDATE_CHILD_TABLE_TAG_VAL: {
734✔
806
      int32_t nTags = taosArrayGetSize(vAlterTbReq.pMultiTag);
734✔
807
      if (nTags <= 0) {
734✔
808
        code = TSDB_CODE_INVALID_PARA;
×
809
        uError("processAlterTable parse multi tags error");
×
810
        goto end;
×
811
      }
812

813
      cJSON* tags = cJSON_AddArrayToObject(json, "tags");
734✔
814
      RAW_NULL_CHECK(tags);
734✔
815

816
      for (int32_t i = 0; i < nTags; i++) {
1,468✔
817
        cJSON* member = tmqAddObjectToArray(tags);
734✔
818
        RAW_NULL_CHECK(member);
734✔
819

820
        SUpdatedTagVal* pTagVal = taosArrayGet(vAlterTbReq.pMultiTag, i);
734✔
821
        ADD_TO_JSON_STRING(member, "colName", pTagVal->tagName);
734✔
822

823
        if (pTagVal->regexp != NULL) {
734✔
824
          ADD_TO_JSON_STRING(member, "regexp", pTagVal->regexp);
367✔
825
          ADD_TO_JSON_STRING(member, "replacement", pTagVal->replacement);
367✔
826
        } else {
827
          bool isNull = pTagVal->isNull;
367✔
828
          if (!isNull) {
367✔
829
            int64_t bufSize = 0;
367✔
830
            if (pTagVal->tagType == TSDB_DATA_TYPE_VARBINARY) {
367✔
831
              bufSize = pTagVal->nTagVal * 2 + 2 + 3;
×
832
            } else {
833
              bufSize = pTagVal->nTagVal + 3;
367✔
834
            }
835
            buf1 = taosMemoryCalloc(bufSize, 1);
367✔
836
            RAW_NULL_CHECK(buf1);
367✔
837
            code = dataConverToStr(buf1, bufSize, pTagVal->tagType, pTagVal->pTagVal, pTagVal->nTagVal, NULL);
367✔
838
            if (code != TSDB_CODE_SUCCESS) {
367✔
839
              uError("convert tag value to string failed");
×
840
              goto end;
×
841
            }
842
            ADD_TO_JSON_STRING(member, "colValue", buf1);
367✔
843
            taosMemoryFreeClear(buf1);
367✔
844
          }
845
          ADD_TO_JSON_BOOL(member, "colValueNull", isNull);
367✔
846
        }
847
      }
848

849
      if (vAlterTbReq.whereLen > 0) {
734✔
850
        buf1 = taosMemoryCalloc(vAlterTbReq.whereLen, 1);
734✔
851
        RAW_NULL_CHECK(buf1);
734✔
852
        buf2 = taosMemoryCalloc(vAlterTbReq.whereLen, 1);
734✔
853
        RAW_NULL_CHECK(buf2);
734✔
854
        memcpy(buf2, vAlterTbReq.where, vAlterTbReq.whereLen);
734✔
855
        RAW_RETURN_CHECK(nodesMsgToNode(buf2, vAlterTbReq.whereLen, &pWhere));
734✔
856
        int32_t tlen = 0;
734✔
857
        RAW_RETURN_CHECK(nodesNodeToSQL(pWhere, buf1, vAlterTbReq.whereLen, &tlen));
734✔
858
        if (tlen >= 1) buf1[tlen - 1] = 0;
734✔
859
        ADD_TO_JSON_STRING(json, "where", buf1 + 1);
734✔
860
        taosMemoryFreeClear(buf1);
734✔
861
        taosMemoryFreeClear(buf2);
734✔
862
      }
863
      break;
734✔
864
    }
865

866
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
×
867
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
×
868
      RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
×
869
      break;
×
870
    }
871
    case TSDB_ALTER_TABLE_ALTER_COLUMN_REF: {
1,100✔
872
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
1,100✔
873
      ADD_TO_JSON_STRING(json, "refDbName", vAlterTbReq.refDbName);
1,100✔
874
      ADD_TO_JSON_STRING(json, "refTbName", vAlterTbReq.refTbName);
1,100✔
875
      ADD_TO_JSON_STRING(json, "refColName", vAlterTbReq.refColName);
1,100✔
876
      break;
1,100✔
877
    }
878
    case TSDB_ALTER_TABLE_REMOVE_COLUMN_REF:{
1,100✔
879
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
1,100✔
880
      break;
1,100✔
881
    }
882
    default:
×
883
      break;
×
884
  }
885

886
end:
6,560✔
887
  nodesDestroyNode(pWhere);
6,560✔
888
  destroyAlterTbReq(&vAlterTbReq);
6,560✔
889
  tDecoderClear(&decoder);
6,560✔
890
  taosMemoryFree(buf);
6,560✔
891
  taosMemoryFree(buf1);
6,560✔
892
  taosMemoryFree(buf2);
6,560✔
893
  *pJson = json;
6,560✔
894
  RAW_LOG_END
6,560✔
895
  return code;
6,560✔
896
}
897

898
static int32_t processDropSTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
×
899
  if (pJson == NULL || metaRsp == NULL) {
×
900
    uError("invalid parameter in %s", __func__);
×
901
    return TSDB_CODE_INVALID_PARA;
×
902
  }
903
  SDecoder     decoder = {0};
×
904
  SVDropStbReq req = {0};
×
905
  cJSON*       json = NULL;
×
906
  int32_t      code = 0;
×
907
  int32_t      lino = 0;
×
908
  RAW_LOG_START
×
909

910
  // decode
911
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
×
912
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
×
913
  tDecoderInit(&decoder, data, len);
×
914
  RAW_RETURN_CHECK(tDecodeSVDropStbReq(&decoder, &req));
×
915

916
  json = cJSON_CreateObject();
×
917
  RAW_NULL_CHECK(json);
×
918
  ADD_TO_JSON_STRING(json, "type", "drop");
×
919
  ADD_TO_JSON_STRING(json, "tableType", "super");
×
920
  ADD_TO_JSON_STRING(json, "tableName", req.name);
×
921

922
end:
×
923
  tDecoderClear(&decoder);
×
924
  *pJson = json;
×
925
  RAW_LOG_END
×
926
  return code;
×
927
}
928
static int32_t processDeleteTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
×
929
  if (pJson == NULL || metaRsp == NULL) {
×
930
    uError("invalid parameter in %s", __func__);
×
931
    return TSDB_CODE_INVALID_PARA;
×
932
  }
933
  SDeleteRes req = {0};
×
934
  SDecoder   coder = {0};
×
935
  cJSON*     json = NULL;
×
936
  int32_t    code = 0;
×
937
  int32_t    lino = 0;
×
938
  RAW_LOG_START
×
939

940
  // decode and process req
941
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
×
942
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
×
943

944
  tDecoderInit(&coder, data, len);
×
945
  RAW_RETURN_CHECK(tDecodeDeleteRes(&coder, &req));
×
946
  //  getTbName(req.tableFName);
947
  char sql[256] = {0};
×
948
  (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
×
949
                 req.tsColName, req.skey, req.tsColName, req.ekey);
950

951
  json = cJSON_CreateObject();
×
952
  RAW_NULL_CHECK(json);
×
953
  ADD_TO_JSON_STRING(json, "type", "delete");
×
954
  ADD_TO_JSON_STRING(json, "sql", sql);
×
955

956
end:
×
957
  tDecoderClear(&coder);
×
958
  *pJson = json;
×
959
  RAW_LOG_END
×
960
  return code;
×
961
}
962

963
static int32_t processDropTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
×
964
  if (pJson == NULL || metaRsp == NULL) {
×
965
    uError("invalid parameter in %s", __func__);
×
966
    return TSDB_CODE_INVALID_PARA;
×
967
  }
968
  SDecoder         decoder = {0};
×
969
  SVDropTbBatchReq req = {0};
×
970
  cJSON*           json = NULL;
×
971
  int32_t          code = 0;
×
972
  int32_t          lino = 0;
×
973
  RAW_LOG_START
×
974
  // decode
975
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
×
976
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
×
977
  tDecoderInit(&decoder, data, len);
×
978
  RAW_RETURN_CHECK(tDecodeSVDropTbBatchReq(&decoder, &req));
×
979

980
  json = cJSON_CreateObject();
×
981
  RAW_NULL_CHECK(json);
×
982
  ADD_TO_JSON_STRING(json, "type", "drop");
×
983

984
  cJSON* tableNameList = cJSON_AddArrayToObject(json, "tableNameList");
×
985
  RAW_NULL_CHECK(tableNameList);
×
986

987
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
988
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
×
989
    RAW_NULL_CHECK(tmqAddStringToArray(tableNameList, pDropTbReq->name));
×
990
  }
991

992
end:
×
993
  tDecoderClear(&decoder);
×
994
  *pJson = json;
×
995
  RAW_LOG_END
×
996
  return code;
×
997
}
998

999
static int32_t taosCreateStb(TAOS* taos, void* meta, uint32_t metaLen) {
4,675✔
1000
  if (taos == NULL || meta == NULL) {
4,675✔
1001
    uError("invalid parameter in %s", __func__);
×
1002
    return TSDB_CODE_INVALID_PARA;
×
1003
  }
1004
  SVCreateStbReq req = {0};
4,675✔
1005
  SDecoder       coder = {0};
4,675✔
1006
  SMCreateStbReq pReq = {0};
4,675✔
1007
  int32_t        code = TSDB_CODE_SUCCESS;
4,675✔
1008
  int32_t        lino = 0;
4,675✔
1009
  SRequestObj*   pRequest = NULL;
4,675✔
1010
  SCmdMsgInfo    pCmdMsg = {0};
4,675✔
1011
  RAW_LOG_START
4,675✔
1012

1013
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
4,675✔
1014
  uDebug(LOG_ID_TAG " create stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
4,675✔
1015
  pRequest->syncQuery = true;
4,675✔
1016
  if (!pRequest->pDb) {
4,675✔
1017
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1018
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
1019
    goto end;
×
1020
  }
1021
  // decode and process req
1022
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
4,675✔
1023
  uint32_t len = metaLen - sizeof(SMsgHead);
4,675✔
1024
  tDecoderInit(&coder, data, len);
4,675✔
1025
  RAW_RETURN_CHECK(tDecodeSVCreateStbReq(&coder, &req));
4,675✔
1026

1027
  int8_t           createDefaultCompress = 0;
4,675✔
1028
  SColCmprWrapper* p = &req.colCmpr;
4,675✔
1029
  if (p->nCols == 0) {
4,675✔
1030
    createDefaultCompress = 1;
×
1031
  }
1032
  // build create stable
1033
  pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SFieldWithOptions));
4,675✔
1034
  RAW_NULL_CHECK(pReq.pColumns);
4,675✔
1035
  for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
72,911✔
1036
    SSchema*          pSchema = req.schemaRow.pSchema + i;
68,236✔
1037
    SFieldWithOptions field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
68,236✔
1038
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
68,236✔
1039

1040
    if (createDefaultCompress) {
68,236✔
1041
      field.compress = createDefaultColCmprByType(pSchema->type);
×
1042
    } else {
1043
      SColCmpr* pCmp = &req.colCmpr.pColCmpr[i];
68,236✔
1044
      field.compress = pCmp->alg;
68,236✔
1045
    }
1046
    if (req.pExtSchemas) field.typeMod = req.pExtSchemas[i].typeMod;
68,236✔
1047
    RAW_NULL_CHECK(taosArrayPush(pReq.pColumns, &field));
136,472✔
1048
  }
1049
  pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
4,675✔
1050
  RAW_NULL_CHECK(pReq.pTags);
4,675✔
1051
  for (int32_t i = 0; i < req.schemaTag.nCols; i++) {
10,802✔
1052
    SSchema* pSchema = req.schemaTag.pSchema + i;
6,127✔
1053
    SField   field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
6,127✔
1054
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
6,127✔
1055
    RAW_NULL_CHECK(taosArrayPush(pReq.pTags, &field));
12,254✔
1056
  }
1057

1058
  pReq.colVer = req.schemaRow.version;
4,675✔
1059
  pReq.tagVer = req.schemaTag.version;
4,675✔
1060
  pReq.numOfColumns = req.schemaRow.nCols;
4,675✔
1061
  pReq.numOfTags = req.schemaTag.nCols;
4,675✔
1062
  pReq.commentLen = -1;
4,675✔
1063
  pReq.suid = processSuid(req.suid, pRequest->pDb);
4,675✔
1064
  pReq.source = TD_REQ_FROM_TAOX;
4,675✔
1065
  pReq.igExists = true;
4,675✔
1066
  pReq.virtualStb = req.virtualStb;
4,675✔
1067

1068
  uDebug(LOG_ID_TAG " create stable name:%s suid:%" PRId64 " processSuid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
4,675✔
1069
         pReq.suid);
1070
  STscObj* pTscObj = pRequest->pTscObj;
4,675✔
1071
  SName    tableName = {0};
4,675✔
1072
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
4,675✔
1073
  RAW_RETURN_CHECK(tNameExtractFullName(&tableName, pReq.name));
4,675✔
1074
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
4,675✔
1075
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
4,675✔
1076
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
4,675✔
1077
  RAW_FALSE_CHECK(pCmdMsg.msgLen > 0);
4,675✔
1078
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
4,675✔
1079
  RAW_NULL_CHECK(pCmdMsg.pMsg);
4,675✔
1080
  RAW_FALSE_CHECK(tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) > 0);
4,675✔
1081

1082
  SQuery pQuery = {0};
4,675✔
1083
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
4,675✔
1084
  pQuery.pCmdMsg = &pCmdMsg;
4,675✔
1085
  pQuery.msgType = pQuery.pCmdMsg->msgType;
4,675✔
1086
  pQuery.stableQuery = true;
4,675✔
1087

1088
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
4,675✔
1089

1090
  if (pRequest->code == TSDB_CODE_SUCCESS) {
4,675✔
1091
    SCatalog* pCatalog = NULL;
4,675✔
1092
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
4,675✔
1093
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
4,675✔
1094
  }
1095

1096
  code = pRequest->code;
4,675✔
1097
  uDebug(LOG_ID_TAG " create stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
4,675✔
1098

1099
end:
4,675✔
1100
  destroyRequest(pRequest);
4,675✔
1101
  tFreeSMCreateStbReq(&pReq);
4,675✔
1102
  tDecoderClear(&coder);
4,675✔
1103
  taosMemoryFree(pCmdMsg.pMsg);
4,675✔
1104
  RAW_LOG_END
4,675✔
1105
  return code;
4,675✔
1106
}
1107

1108
static int32_t taosDropStb(TAOS* taos, void* meta, uint32_t metaLen) {
×
1109
  if (taos == NULL || meta == NULL) {
×
1110
    uError("invalid parameter in %s", __func__);
×
1111
    return TSDB_CODE_INVALID_PARA;
×
1112
  }
1113
  SVDropStbReq req = {0};
×
1114
  SDecoder     coder = {0};
×
1115
  SMDropStbReq pReq = {0};
×
1116
  int32_t      code = TSDB_CODE_SUCCESS;
×
1117
  int32_t      lino = 0;
×
1118
  SRequestObj* pRequest = NULL;
×
1119
  SCmdMsgInfo  pCmdMsg = {0};
×
1120

1121
  RAW_LOG_START
×
1122
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
×
1123
  uDebug(LOG_ID_TAG " drop stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
×
1124
  pRequest->syncQuery = true;
×
1125
  if (!pRequest->pDb) {
×
1126
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
1127
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1128
    goto end;
×
1129
  }
1130
  // decode and process req
1131
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
×
1132
  uint32_t len = metaLen - sizeof(SMsgHead);
×
1133
  tDecoderInit(&coder, data, len);
×
1134
  RAW_RETURN_CHECK(tDecodeSVDropStbReq(&coder, &req));
×
1135
  SCatalog* pCatalog = NULL;
×
1136
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
×
1137
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
×
1138
                           .requestId = pRequest->requestId,
×
1139
                           .requestObjRefId = pRequest->self,
×
1140
                           .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
×
1141
  SName            pName = {0};
×
1142
  toName(pRequest->pTscObj->acctId, pRequest->pDb, req.name, &pName);
×
1143
  STableMeta* pTableMeta = NULL;
×
1144
  code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
×
1145
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
×
1146
    uInfo(LOG_ID_TAG " stable %s not exist, ignore drop", LOG_ID_VALUE, req.name);
×
1147
    code = TSDB_CODE_SUCCESS;
×
1148
    taosMemoryFreeClear(pTableMeta);
×
1149
    goto end;
×
1150
  }
1151
  RAW_RETURN_CHECK(code);
×
1152
  pReq.suid = pTableMeta->uid;
×
1153
  taosMemoryFreeClear(pTableMeta);
×
1154

1155
  // build drop stable
1156
  pReq.igNotExists = true;
×
1157
  pReq.source = TD_REQ_FROM_TAOX;
×
1158
  //  pReq.suid = processSuid(req.suid, pRequest->pDb);
1159

1160
  uDebug(LOG_ID_TAG " drop stable name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
×
1161
         pReq.suid);
1162
  STscObj* pTscObj = pRequest->pTscObj;
×
1163
  SName    tableName = {0};
×
1164
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
×
1165
  RAW_RETURN_CHECK(tNameExtractFullName(&tableName, pReq.name));
×
1166

1167
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
×
1168
  pCmdMsg.msgType = TDMT_MND_DROP_STB;
×
1169
  pCmdMsg.msgLen = tSerializeSMDropStbReq(NULL, 0, &pReq);
×
1170
  RAW_FALSE_CHECK(pCmdMsg.msgLen > 0);
×
1171
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
×
1172
  RAW_NULL_CHECK(pCmdMsg.pMsg);
×
1173
  RAW_FALSE_CHECK(tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) > 0);
×
1174

1175
  SQuery pQuery = {0};
×
1176
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
×
1177
  pQuery.pCmdMsg = &pCmdMsg;
×
1178
  pQuery.msgType = pQuery.pCmdMsg->msgType;
×
1179
  pQuery.stableQuery = true;
×
1180

1181
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
×
1182
  if (pRequest->code == TSDB_CODE_SUCCESS) {
×
1183
    // ignore the error code
1184
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
×
1185
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
×
1186
  }
1187

1188
  code = pRequest->code;
×
1189
  uDebug(LOG_ID_TAG " drop stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
×
1190

1191
end:
×
1192
  RAW_LOG_END
×
1193
  destroyRequest(pRequest);
×
1194
  tDecoderClear(&coder);
×
1195
  return code;
×
1196
}
1197

1198
typedef struct SVgroupCreateTableBatch {
1199
  SVCreateTbBatchReq req;
1200
  SVgroupInfo        info;
1201
  char               dbName[TSDB_DB_NAME_LEN];
1202
} SVgroupCreateTableBatch;
1203

1204
static void destroyCreateTbReqBatch(void* data) {
13,012✔
1205
  if (data == NULL) {
13,012✔
1206
    uError("invalid parameter in %s", __func__);
×
1207
    return;
×
1208
  }
1209
  SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data;
13,012✔
1210
  taosArrayDestroy(pTbBatch->req.pArray);
13,012✔
1211
}
1212

1213
static const SSchema* getNormalColSchema(const STableMeta* pTableMeta, const char* pColName) {
6,492✔
1214
  for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) {
18,768✔
1215
    const SSchema* pSchema = pTableMeta->schema + i;
18,768✔
1216
    if (0 == strcmp(pColName, pSchema->name)) {
18,768✔
1217
      return pSchema;
6,492✔
1218
    }
1219
  }
1220
  return NULL;
×
1221
}
1222

1223
static STableMeta* getTableMeta(SCatalog* pCatalog, SRequestConnInfo* conn, char* dbName, char* tbName, int32_t acctId){
6,492✔
1224
  SName       sName = {0};
6,492✔
1225
  toName(acctId, dbName, tbName, &sName);
6,492✔
1226
  STableMeta* pTableMeta = NULL;
6,492✔
1227
  int32_t code = catalogGetTableMeta(pCatalog, conn, &sName, &pTableMeta);
6,492✔
1228
  if (code != 0) {
6,492✔
1229
    uError("failed to get table meta for reference table:%s.%s", dbName, tbName);
×
1230
    taosMemoryFreeClear(pTableMeta);
×
1231
    terrno = code;
×
1232
    return NULL;
×
1233
  }
1234
  return pTableMeta;
6,492✔
1235
}
1236

1237
static int32_t checkColRef(STableMeta* pTableMeta, char* colName, uint8_t precision, const SSchema* pSchema) {
6,126✔
1238
  int32_t code = TSDB_CODE_SUCCESS;
6,126✔
1239
  if (pTableMeta->tableInfo.precision != precision) {
6,126✔
1240
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN_TYPE;
×
1241
    uError("timestamp precision of virtual table and its reference table do not match");
×
1242
    goto end;
×
1243
  }
1244
  // org table cannot has composite primary key
1245
  if (pTableMeta->tableInfo.numOfColumns > 1 && pTableMeta->schema[1].flags & COL_IS_KEY) {
6,126✔
1246
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
1247
    uError("virtual table's column:\"%s\"'s reference can not from table with composite key", colName);
×
1248
    goto end;
×
1249
  }
1250

1251
  // org table must be child table or normal table
1252
  if (pTableMeta->tableType != TSDB_NORMAL_TABLE && pTableMeta->tableType != TSDB_CHILD_TABLE) {
6,126✔
1253
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
1254
    uError("virtual table's column:\"%s\"'s reference can only be normal table or child table", colName);
×
1255
    goto end;
×
1256
  }
1257

1258
  const SSchema* pRefCol = getNormalColSchema(pTableMeta, colName);
6,126✔
1259
  if (NULL == pRefCol) {
6,126✔
1260
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
1261
    uError("virtual table's column:\"%s\"'s reference column:\"%s\" not exist", pSchema->name, colName);
×
1262
    goto end;
×
1263
  }
1264

1265
  if (pRefCol->type != pSchema->type || pRefCol->bytes != pSchema->bytes) {
6,126✔
1266
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN_TYPE;
×
1267
    uError("virtual table's column:\"%s\"'s type and reference column:\"%s\"'s type not match, %d %d %d %d",
×
1268
            pSchema->name, colName, pSchema->type, pSchema->bytes, pRefCol->type, pRefCol->bytes);
1269
    goto end;
×
1270
  }
1271

1272
end:
6,126✔
1273
  return code;
6,126✔
1274
}
1275

1276
static int32_t checkColRefForCreate(SCatalog* pCatalog, SRequestConnInfo* conn, SColRef* pColRef, int32_t acctId, uint8_t precision, SSchema* pSchema) {
5,760✔
1277
  STableMeta* pTableMeta = getTableMeta(pCatalog, conn, pColRef->refDbName, pColRef->refTableName, acctId);
5,760✔
1278
  if (pTableMeta == NULL) {
5,760✔
1279
      return terrno;
×
1280
  }
1281
  int32_t code = checkColRef(pTableMeta, pColRef->refColName, precision, pSchema);
5,760✔
1282
  taosMemoryFreeClear(pTableMeta);
5,760✔
1283
  return code;
5,760✔
1284
}
1285

1286
static int32_t checkColRefForAdd(SCatalog* pCatalog, SRequestConnInfo* conn, int32_t acctId, char* dbName, char* tbName, char* colName, 
×
1287
  char* dbNameSrc, char* tbNameSrc, char* colNameSrc, int8_t type, int32_t bytes) {
1288
  int32_t code = 0;
×
1289
  STableMeta* pTableMeta = getTableMeta(pCatalog, conn, dbName, tbName, acctId);
×
1290
  if (pTableMeta == NULL) {
×
1291
    code = terrno;
×
1292
    goto end;
×
1293
  }
1294
  STableMeta* pTableMetaSrc = getTableMeta(pCatalog, conn, dbNameSrc, tbNameSrc, acctId);
×
1295
  if (pTableMetaSrc == NULL) {
×
1296
    code = terrno;
×
1297
    goto end;
×
1298
  }
1299

1300
  SSchema pSchema = {.type = type, .bytes = bytes};
×
1301
  tstrncpy(pSchema.name, colNameSrc, TSDB_COL_NAME_LEN);
×
1302
  code = checkColRef(pTableMeta, colName, pTableMetaSrc->tableInfo.precision, &pSchema);
×
1303

1304
end:
×
1305
  taosMemoryFreeClear(pTableMeta);
×
1306
  taosMemoryFreeClear(pTableMetaSrc);
×
1307
  return code;
×
1308
}
1309

1310
static int32_t checkColRefForAlter(SCatalog* pCatalog, SRequestConnInfo* conn, int32_t acctId, char* dbName, char* tbName, char* colName, 
366✔
1311
  char* dbNameSrc, char* tbNameSrc, char* colNameSrc) {
1312
  int32_t code = 0;
366✔
1313
  STableMeta* pTableMeta = getTableMeta(pCatalog, conn, dbName, tbName, acctId);
366✔
1314
  if (pTableMeta == NULL) {
366✔
1315
    code = terrno;
×
1316
    goto end;
×
1317
  }
1318
  STableMeta* pTableMetaSrc = getTableMeta(pCatalog, conn, dbNameSrc, tbNameSrc, acctId);
366✔
1319
  if (pTableMetaSrc == NULL) {
366✔
1320
    code = terrno;
×
1321
    goto end;
×
1322
  }
1323
  const SSchema* pSchema = getNormalColSchema(pTableMetaSrc, colNameSrc);
366✔
1324
  if (NULL == pSchema) {
366✔
1325
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
1326
    uError("virtual table's column:\"%s\" not exist", colNameSrc);
×
1327
    goto end;
×
1328
  }
1329

1330
  code = checkColRef(pTableMeta, colName, pTableMetaSrc->tableInfo.precision, pSchema);
366✔
1331

1332
end:
366✔
1333
  taosMemoryFreeClear(pTableMeta);
366✔
1334
  taosMemoryFreeClear(pTableMetaSrc);
366✔
1335
  return code;
366✔
1336
}
1337

1338
static int32_t taosCreateTable(TAOS* taos, void* meta, uint32_t metaLen) {
13,012✔
1339
  if (taos == NULL || meta == NULL) {
13,012✔
1340
    uError("invalid parameter in %s", __func__);
×
1341
    return TSDB_CODE_INVALID_PARA;
×
1342
  }
1343
  SVCreateTbBatchReq req = {0};
13,012✔
1344
  SDecoder           coder = {0};
13,012✔
1345
  int32_t            code = TSDB_CODE_SUCCESS;
13,012✔
1346
  int32_t            lino = 0;
13,012✔
1347
  SRequestObj*       pRequest = NULL;
13,012✔
1348
  SQuery*            pQuery = NULL;
13,012✔
1349
  SHashObj*          pVgroupHashmap = NULL;
13,012✔
1350

1351
  RAW_LOG_START
13,012✔
1352
  SArray* pTagList = taosArrayInit(0, POINTER_BYTES);
13,012✔
1353
  RAW_NULL_CHECK(pTagList);
13,012✔
1354
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
13,012✔
1355
  uDebug(LOG_ID_TAG " create table, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
13,012✔
1356

1357
  pRequest->syncQuery = true;
13,012✔
1358
  if (!pRequest->pDb) {
13,012✔
1359
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
1360
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1361
    goto end;
×
1362
  }
1363
  // decode and process req
1364
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
13,012✔
1365
  uint32_t len = metaLen - sizeof(SMsgHead);
13,012✔
1366
  tDecoderInit(&coder, data, len);
13,012✔
1367
  RAW_RETURN_CHECK(tDecodeSVCreateTbBatchReq(&coder, &req));
13,012✔
1368
  STscObj* pTscObj = pRequest->pTscObj;
13,012✔
1369

1370
  SVCreateTbReq* pCreateReq = NULL;
13,012✔
1371
  SCatalog*      pCatalog = NULL;
13,012✔
1372
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
13,012✔
1373
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
13,012✔
1374
  RAW_NULL_CHECK(pVgroupHashmap);
13,012✔
1375
  taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
13,012✔
1376

1377
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
13,012✔
1378
                           .requestId = pRequest->requestId,
13,012✔
1379
                           .requestObjRefId = pRequest->self,
13,012✔
1380
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
13,012✔
1381

1382
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
13,012✔
1383
  RAW_NULL_CHECK(pRequest->tableList);
13,012✔
1384
  // loop to create table
1385
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
26,024✔
1386
    pCreateReq = req.pReqs + iReq;
13,012✔
1387

1388
    SVgroupInfo pInfo = {0};
13,012✔
1389
    SName       pName = {0};
13,012✔
1390
    toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
13,012✔
1391
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
13,012✔
1392

1393
    pCreateReq->flags |= TD_CREATE_IF_NOT_EXISTS;
13,012✔
1394
    // change tag cid to new cid
1395
    if (pCreateReq->type == TSDB_CHILD_TABLE || pCreateReq->type == TSDB_VIRTUAL_CHILD_TABLE) {
13,012✔
1396
      STableMeta* pTableMeta = NULL;
9,390✔
1397
      SName       sName = {0};
9,390✔
1398
      tb_uid_t    oldSuid = pCreateReq->ctb.suid;
9,390✔
1399
      //      pCreateReq->ctb.suid = processSuid(pCreateReq->ctb.suid, pRequest->pDb);
1400
      toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName);
9,390✔
1401
      code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta);
9,390✔
1402
      if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
9,390✔
1403
        uInfo(LOG_ID_TAG " super table %s not exist, ignore create child table %s", LOG_ID_VALUE,
×
1404
              pCreateReq->ctb.stbName, pCreateReq->name);
1405
        code = TSDB_CODE_SUCCESS;
×
1406
        taosMemoryFreeClear(pTableMeta);
×
1407
        continue;
×
1408
      }
1409

1410
      RAW_RETURN_CHECK(code);
9,390✔
1411
      pCreateReq->ctb.suid = pTableMeta->uid;
9,390✔
1412

1413
      bool changeDB = strlen(tmqWriteRefDB) > 0;
9,390✔
1414
      for (int32_t i = 0; changeDB && i < pCreateReq->colRef.nCols; i++) {
23,846✔
1415
        SColRef* pColRef = pCreateReq->colRef.pColRef + i;
14,456✔
1416
        tstrncpy(pColRef->refDbName, tmqWriteRefDB, TSDB_DB_NAME_LEN);
14,456✔
1417
      }
1418

1419
      for (int32_t i = 0; tmqWriteCheckRef && i < pCreateReq->colRef.nCols && i < pTableMeta->tableInfo.numOfColumns; i++) {
20,910✔
1420
        SColRef* pColRef = pCreateReq->colRef.pColRef + i;
11,520✔
1421
        if (!pColRef || !pColRef->hasRef) continue;
11,520✔
1422
        SSchema* pSchema = pTableMeta->schema + i;
5,760✔
1423
        RAW_RETURN_CHECK(checkColRefForCreate(pCatalog, &conn, pColRef, pTscObj->acctId, pTableMeta->tableInfo.precision, pSchema));
5,760✔
1424
      }
1425
      
1426
      SArray* pTagVals = NULL;
9,390✔
1427
      code = tTagToValArray((STag*)pCreateReq->ctb.pTag, &pTagVals);
9,390✔
1428
      if (code != TSDB_CODE_SUCCESS) {
9,390✔
1429
        uError("create tb invalid tag data %s", pCreateReq->name);
×
1430
        taosMemoryFreeClear(pTableMeta);
×
1431
        goto end;
×
1432
      }
1433

1434
      bool rebuildTag = false;
9,390✔
1435
      for (int32_t i = 0; i < taosArrayGetSize(pCreateReq->ctb.tagName); i++) {
22,406✔
1436
        char* tName = taosArrayGet(pCreateReq->ctb.tagName, i);
13,016✔
1437
        if (tName == NULL) {
13,016✔
1438
          continue;
×
1439
        }
1440
        for (int32_t j = pTableMeta->tableInfo.numOfColumns;
13,016✔
1441
             j < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; j++) {
34,371✔
1442
          SSchema* tag = &pTableMeta->schema[j];
21,355✔
1443
          if (strcmp(tag->name, tName) == 0 && tag->type != TSDB_DATA_TYPE_JSON) {
21,355✔
1444
            STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
13,016✔
1445
            if (pTagVal) {
13,016✔
1446
              if (pTagVal->cid != tag->colId) {
13,016✔
1447
                pTagVal->cid = tag->colId;
345✔
1448
                rebuildTag = true;
345✔
1449
              }
1450
            } else {
1451
              uError("create tb invalid data %s, size:%d index:%d cid:%d", pCreateReq->name,
×
1452
                     (int)taosArrayGetSize(pTagVals), i, tag->colId);
1453
            }
1454
          }
1455
        }
1456
      }
1457
      taosMemoryFreeClear(pTableMeta);
9,390✔
1458
      if (rebuildTag) {
9,390✔
1459
        STag* ppTag = NULL;
345✔
1460
        code = tTagNew(pTagVals, 1, false, &ppTag);
345✔
1461
        taosArrayDestroy(pTagVals);
345✔
1462
        pTagVals = NULL;
345✔
1463
        if (code != TSDB_CODE_SUCCESS) {
345✔
1464
          goto end;
×
1465
        }
1466
        if (NULL == taosArrayPush(pTagList, &ppTag)) {
345✔
1467
          code = terrno;
×
1468
          tTagFree(ppTag);
×
1469
          goto end;
×
1470
        }
1471
        pCreateReq->ctb.pTag = (uint8_t*)ppTag;
345✔
1472
      }
1473
      taosArrayDestroy(pTagVals);
9,390✔
1474
    }
1475
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
26,024✔
1476

1477
    SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
13,012✔
1478
    if (pTableBatch == NULL) {
13,012✔
1479
      SVgroupCreateTableBatch tBatch = {0};
13,012✔
1480
      tBatch.info = pInfo;
13,012✔
1481
      tstrncpy(tBatch.dbName, pRequest->pDb, TSDB_DB_NAME_LEN);
13,012✔
1482

1483
      tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
13,012✔
1484
      RAW_NULL_CHECK(tBatch.req.pArray);
13,012✔
1485
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pCreateReq));
26,024✔
1486
      tBatch.req.source = TD_REQ_FROM_TAOX;
13,012✔
1487
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
13,012✔
1488
    } else {  // add to the correct vgroup
1489
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pCreateReq));
×
1490
    }
1491
  }
1492

1493
  if (taosHashGetSize(pVgroupHashmap) == 0) {
13,012✔
1494
    goto end;
×
1495
  }
1496
  SArray* pBufArray = NULL;
13,012✔
1497
  RAW_RETURN_CHECK(serializeVgroupsCreateTableBatch(pVgroupHashmap, &pBufArray));
13,012✔
1498
  pQuery = NULL;
13,012✔
1499
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
13,012✔
1500
  if (TSDB_CODE_SUCCESS != code) goto end;
13,012✔
1501
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
13,012✔
1502
  pQuery->msgType = TDMT_VND_CREATE_TABLE;
13,012✔
1503
  pQuery->stableQuery = false;
13,012✔
1504
  code = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT, &pQuery->pRoot);
13,012✔
1505
  if (TSDB_CODE_SUCCESS != code) goto end;
13,012✔
1506
  RAW_NULL_CHECK(pQuery->pRoot);
13,012✔
1507

1508
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
13,012✔
1509

1510
  launchQueryImpl(pRequest, pQuery, true, NULL);
13,012✔
1511
  if (pRequest->code == TSDB_CODE_SUCCESS) {
13,012✔
1512
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
13,012✔
1513
  }
1514

1515
  code = pRequest->code;
13,012✔
1516
  uDebug(LOG_ID_TAG " create table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
13,012✔
1517

1518
end:
13,012✔
1519
  tDeleteSVCreateTbBatchReq(&req);
13,012✔
1520

1521
  taosHashCleanup(pVgroupHashmap);
13,012✔
1522
  destroyRequest(pRequest);
13,012✔
1523
  tDecoderClear(&coder);
13,012✔
1524
  qDestroyQuery(pQuery);
13,012✔
1525
  taosArrayDestroyP(pTagList, NULL);
13,012✔
1526
  RAW_LOG_END
13,012✔
1527
  return code;
13,012✔
1528
}
1529

1530
typedef struct SVgroupDropTableBatch {
1531
  SVDropTbBatchReq req;
1532
  SVgroupInfo      info;
1533
  char             dbName[TSDB_DB_NAME_LEN];
1534
} SVgroupDropTableBatch;
1535

1536
static void destroyDropTbReqBatch(void* data) {
×
1537
  if (data == NULL) {
×
1538
    uError("invalid parameter in %s", __func__);
×
1539
    return;
×
1540
  }
1541
  SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
×
1542
  taosArrayDestroy(pTbBatch->req.pArray);
×
1543
}
1544

1545
static int32_t taosDropTable(TAOS* taos, void* meta, uint32_t metaLen) {
×
1546
  if (taos == NULL || meta == NULL) {
×
1547
    uError("invalid parameter in %s", __func__);
×
1548
    return TSDB_CODE_INVALID_PARA;
×
1549
  }
1550
  SVDropTbBatchReq req = {0};
×
1551
  SDecoder         coder = {0};
×
1552
  int32_t          code = TSDB_CODE_SUCCESS;
×
1553
  int32_t          lino = 0;
×
1554
  SRequestObj*     pRequest = NULL;
×
1555
  SQuery*          pQuery = NULL;
×
1556
  SHashObj*        pVgroupHashmap = NULL;
×
1557

1558
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
×
1559
  uDebug(LOG_ID_TAG " drop table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
×
1560

1561
  pRequest->syncQuery = true;
×
1562
  if (!pRequest->pDb) {
×
1563
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
1564
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1565
    goto end;
×
1566
  }
1567
  // decode and process req
1568
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
×
1569
  uint32_t len = metaLen - sizeof(SMsgHead);
×
1570
  tDecoderInit(&coder, data, len);
×
1571
  RAW_RETURN_CHECK(tDecodeSVDropTbBatchReq(&coder, &req));
×
1572
  STscObj* pTscObj = pRequest->pTscObj;
×
1573

1574
  SVDropTbReq* pDropReq = NULL;
×
1575
  SCatalog*    pCatalog = NULL;
×
1576
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
×
1577

1578
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
1579
  RAW_NULL_CHECK(pVgroupHashmap);
×
1580
  taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
×
1581

1582
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
×
1583
                           .requestId = pRequest->requestId,
×
1584
                           .requestObjRefId = pRequest->self,
×
1585
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
×
1586
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
×
1587
  RAW_NULL_CHECK(pRequest->tableList);
×
1588
  // loop to create table
1589
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
1590
    pDropReq = req.pReqs + iReq;
×
1591
    pDropReq->igNotExists = true;
×
1592
    //    pDropReq->suid = processSuid(pDropReq->suid, pRequest->pDb);
1593

1594
    SVgroupInfo pInfo = {0};
×
1595
    SName       pName = {0};
×
1596
    toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
×
1597
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
×
1598

1599
    STableMeta* pTableMeta = NULL;
×
1600
    code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
×
1601
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
×
1602
      code = TSDB_CODE_SUCCESS;
×
1603
      uInfo(LOG_ID_TAG " table %s not exist, ignore drop", LOG_ID_VALUE, pDropReq->name);
×
1604
      taosMemoryFreeClear(pTableMeta);
×
1605
      continue;
×
1606
    }
1607
    RAW_RETURN_CHECK(code);
×
1608
    tb_uid_t oldSuid = pDropReq->suid;
×
1609
    pDropReq->suid = pTableMeta->suid;
×
1610
    taosMemoryFreeClear(pTableMeta);
×
1611
    uDebug(LOG_ID_TAG " drop table name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, pDropReq->name, oldSuid,
×
1612
           pDropReq->suid);
1613

1614
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
×
1615
    SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
×
1616
    if (pTableBatch == NULL) {
×
1617
      SVgroupDropTableBatch tBatch = {0};
×
1618
      tBatch.info = pInfo;
×
1619
      tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
×
1620
      RAW_NULL_CHECK(tBatch.req.pArray);
×
1621
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pDropReq));
×
1622
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
×
1623
    } else {  // add to the correct vgroup
1624
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pDropReq));
×
1625
    }
1626
  }
1627

1628
  if (taosHashGetSize(pVgroupHashmap) == 0) {
×
1629
    goto end;
×
1630
  }
1631
  SArray* pBufArray = NULL;
×
1632
  RAW_RETURN_CHECK(serializeVgroupsDropTableBatch(pVgroupHashmap, &pBufArray));
×
1633
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
×
1634
  if (TSDB_CODE_SUCCESS != code) goto end;
×
1635
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
×
1636
  pQuery->msgType = TDMT_VND_DROP_TABLE;
×
1637
  pQuery->stableQuery = false;
×
1638
  pQuery->pRoot = NULL;
×
1639
  code = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT, &pQuery->pRoot);
×
1640
  if (TSDB_CODE_SUCCESS != code) goto end;
×
1641
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
×
1642

1643
  launchQueryImpl(pRequest, pQuery, true, NULL);
×
1644
  if (pRequest->code == TSDB_CODE_SUCCESS) {
×
1645
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
×
1646
  }
1647
  code = pRequest->code;
×
1648
  uDebug(LOG_ID_TAG " drop table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
×
1649

1650
end:
×
1651
  taosHashCleanup(pVgroupHashmap);
×
1652
  destroyRequest(pRequest);
×
1653
  tDecoderClear(&coder);
×
1654
  qDestroyQuery(pQuery);
×
1655
  RAW_LOG_END
×
1656
  return code;
×
1657
}
1658

1659
static int32_t taosDeleteData(TAOS* taos, void* meta, uint32_t metaLen) {
×
1660
  if (taos == NULL || meta == NULL) {
×
1661
    uError("invalid parameter in %s", __func__);
×
1662
    return TSDB_CODE_INVALID_PARA;
×
1663
  }
1664
  SDeleteRes req = {0};
×
1665
  SDecoder   coder = {0};
×
1666
  char       sql[256] = {0};
×
1667
  int32_t    code = TSDB_CODE_SUCCESS;
×
1668
  int32_t    lino = 0;
×
1669
  uDebug("connId:0x%" PRIx64 " delete data, meta:%p, len:%d", *(int64_t*)taos, meta, metaLen);
×
1670

1671
  // decode and process req
1672
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
×
1673
  uint32_t len = metaLen - sizeof(SMsgHead);
×
1674
  tDecoderInit(&coder, data, len);
×
1675
  RAW_RETURN_CHECK(tDecodeDeleteRes(&coder, &req));
×
1676
  (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
×
1677
                 req.tsColName, req.skey, req.tsColName, req.ekey);
1678

1679
  TAOS_RES* res = taosQueryImpl(taos, sql, false, TD_REQ_FROM_TAOX);
×
1680
  RAW_NULL_CHECK(res);
×
1681
  SRequestObj* pRequest = (SRequestObj*)res;
×
1682
  code = pRequest->code;
×
1683
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_PAR_GET_META_ERROR) {
×
1684
    code = TSDB_CODE_SUCCESS;
×
1685
  }
1686
  taos_free_result(res);
×
1687
  uDebug("connId:0x%" PRIx64 " delete data sql:%s, code:%s", *(int64_t*)taos, sql, tstrerror(code));
×
1688

1689
end:
×
1690
  RAW_LOG_END
×
1691
  tDecoderClear(&coder);
×
1692
  return code;
×
1693
}
1694

1695
static int32_t taosAlterTable(TAOS* taos, void* meta, uint32_t metaLen) {
6,228✔
1696
  if (taos == NULL || meta == NULL) {
6,228✔
1697
    uError("invalid parameter in %s", __func__);
×
1698
    return TSDB_CODE_INVALID_PARA;
×
1699
  }
1700
  SVAlterTbReq   req = {0};
6,228✔
1701
  SDecoder       dcoder = {0};
6,228✔
1702
  int32_t        code = TSDB_CODE_SUCCESS;
6,228✔
1703
  int32_t        lino = 0;
6,228✔
1704
  SRequestObj*   pRequest = NULL;
6,228✔
1705
  SQuery*        pQuery = NULL;
6,228✔
1706
  SArray*        pArray = NULL;
6,228✔
1707
  SVgDataBlocks* pVgData = NULL;
6,228✔
1708
  SArray*        pVgList = NULL;
6,228✔
1709
  SEncoder       coder = {0};
6,228✔
1710
  SHashObj*      pVgroupHashmap = NULL;
6,228✔
1711

1712
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
6,228✔
1713
  uDebug(LOG_ID_TAG " alter table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
6,228✔
1714
  pRequest->syncQuery = true;
6,228✔
1715
  if (!pRequest->pDb) {
6,228✔
1716
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
1717
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1718
    goto end;
×
1719
  }
1720
  // decode and process req
1721
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
6,228✔
1722
  uint32_t len = metaLen - sizeof(SMsgHead);
6,228✔
1723
  tDecoderInit(&dcoder, data, len);
6,228✔
1724
  RAW_RETURN_CHECK(tDecodeSVAlterTbReq(&dcoder, &req));
6,228✔
1725
  // do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
1726
  if (req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS) {
6,228✔
1727
    uInfo(LOG_ID_TAG " alter table action is UPDATE_OPTIONS, ignore", LOG_ID_VALUE);
×
1728
    goto end;
×
1729
  }
1730

1731
  STscObj*  pTscObj = pRequest->pTscObj;
6,228✔
1732
  SCatalog* pCatalog = NULL;
6,228✔
1733
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
6,228✔
1734
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
6,228✔
1735
                           .requestId = pRequest->requestId,
6,228✔
1736
                           .requestObjRefId = pRequest->self,
6,228✔
1737
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
6,228✔
1738

1739
  // Handle Type 1 batch modification with vnode grouping
1740
  if (req.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TABLE_TAG_VAL) {
6,228✔
1741
    if (req.tables == NULL || taosArrayGetSize(req.tables) == 0) {
2,560✔
1742
      uError(LOG_ID_TAG " Type 1 batch alter has empty tables array", LOG_ID_VALUE);
×
1743
      code = TSDB_CODE_INVALID_PARA;
×
1744
      goto end;
×
1745
    }
1746

1747
    int32_t nTables = taosArrayGetSize(req.tables);
2,560✔
1748
    uDebug(LOG_ID_TAG " Type 1 batch alter with %d tables, grouping by vnode", LOG_ID_VALUE, nTables);
2,560✔
1749

1750
    // Create hashmap to group tables by vgId
1751
    pVgroupHashmap = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
2,560✔
1752
    RAW_NULL_CHECK(pVgroupHashmap);
2,560✔
1753

1754
    // Group tables by vnode
1755
    for (int32_t i = 0; i < nTables; i++) {
5,854✔
1756
      SUpdateTableTagVal* pTable = taosArrayGet(req.tables, i);
3,294✔
1757
      if (pTable == NULL || pTable->tbName == NULL) {
3,294✔
1758
        uWarn(LOG_ID_TAG " Type 1 batch alter table[%d] has invalid name, skip", LOG_ID_VALUE, i);
×
1759
        continue;
×
1760
      }
1761

1762
      // Query vnode for this table
1763
      SVgroupInfo vgInfo = {0};
3,294✔
1764
      SName pName = {0};
3,294✔
1765
      toName(pTscObj->acctId, pRequest->pDb, pTable->tbName, &pName);
3,294✔
1766
      code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgInfo);
3,294✔
1767
      if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
3,294✔
1768
        uWarn(LOG_ID_TAG " Type 1 batch alter table %s not found, skip", LOG_ID_VALUE, pTable->tbName);
×
1769
        code = TSDB_CODE_SUCCESS;
×
1770
        continue;
×
1771
      }
1772
      RAW_RETURN_CHECK(code);
3,294✔
1773

1774
      // Add table to corresponding vnode's array
1775
      SArray** ppTables = taosHashGet(pVgroupHashmap, &vgInfo.vgId, sizeof(int32_t));
3,294✔
1776
      if (ppTables == NULL) {
3,294✔
1777
        SArray* pTables = taosArrayInit(16, sizeof(SUpdateTableTagVal));
2,927✔
1778
        RAW_NULL_CHECK(pTables);
2,927✔
1779
        RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &vgInfo.vgId, sizeof(int32_t), &pTables, sizeof(void*)));
2,927✔
1780
        ppTables = taosHashGet(pVgroupHashmap, &vgInfo.vgId, sizeof(int32_t));
2,927✔
1781
      }
1782

1783
      SArray* pTables = *ppTables;
3,294✔
1784
      RAW_NULL_CHECK(taosArrayPush(pTables, pTable));
3,294✔
1785
    }
1786

1787
    // Build and send separate request for each vnode
1788
    pArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*));
2,560✔
1789
    RAW_NULL_CHECK(pArray);
2,560✔
1790

1791
    void* pIter = taosHashIterate(pVgroupHashmap, NULL);
2,560✔
1792
    while (pIter) {
5,487✔
1793
      size_t keyLen = 0;
2,927✔
1794
      int32_t* pVgId = taosHashGetKey(pIter, &keyLen);
2,927✔
1795
      SArray* pTables = *(SArray**)pIter;
2,927✔
1796
      int32_t nTablesInVg = taosArrayGetSize(pTables);
2,927✔
1797

1798
      uDebug(LOG_ID_TAG " Type 1 batch alter: vgId:%d has %d tables", LOG_ID_VALUE, *pVgId, nTablesInVg);
2,927✔
1799

1800
      // Build SVAlterTbReq for this vnode
1801
      SVAlterTbReq vgReq = {0};
2,927✔
1802
      vgReq.action = req.action;
2,927✔
1803
      vgReq.tbName = req.tbName;
2,927✔
1804
      vgReq.source = TD_REQ_FROM_TAOX;
2,927✔
1805
      vgReq.tables = pTables;
2,927✔
1806

1807
      // Encode request
1808
      int tlen = 0;
2,927✔
1809
      tEncodeSize(tEncodeSVAlterTbReq, &vgReq, tlen, code);
2,927✔
1810
      if (code < 0) {
2,927✔
1811
        uError(LOG_ID_TAG " Type 1 batch alter encode failed for vgId:%d, code:%s",
×
1812
               LOG_ID_VALUE, *pVgId, tstrerror(code));
1813
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
1814
        goto end;
×
1815
      }
1816

1817
      tlen += sizeof(SMsgHead);
2,927✔
1818
      void* pMsg = taosMemoryMalloc(tlen);
2,927✔
1819
      if (pMsg == NULL) {
2,927✔
1820
        code = terrno;
×
1821
        uError(LOG_ID_TAG " Type 1 batch alter malloc failed for vgId:%d, size:%d",
×
1822
               LOG_ID_VALUE, *pVgId, tlen);
1823
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
1824
        goto end;
×
1825
      }
1826

1827
      ((SMsgHead*)pMsg)->vgId = htonl(*pVgId);
2,927✔
1828
      ((SMsgHead*)pMsg)->contLen = htonl(tlen);
2,927✔
1829
      void* pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
2,927✔
1830

1831
      SEncoder vgCoder = {0};
2,927✔
1832
      tEncoderInit(&vgCoder, pBuf, tlen - sizeof(SMsgHead));
2,927✔
1833
      code = tEncodeSVAlterTbReq(&vgCoder, &vgReq);
2,927✔
1834
      tEncoderClear(&vgCoder);
2,927✔
1835

1836
      if (code < 0) {
2,927✔
1837
        uError(LOG_ID_TAG " Type 1 batch alter encode2 failed for vgId:%d, code:%s",
×
1838
               LOG_ID_VALUE, *pVgId, tstrerror(code));
1839
        taosMemoryFree(pMsg);
×
1840
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
1841
        goto end;
×
1842
      }
1843

1844
      // Create VgDataBlocks for this vnode
1845
      pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
2,927✔
1846
      if (pVgData == NULL) {
2,927✔
1847
        code = terrno;
×
1848
        taosMemoryFree(pMsg);
×
1849
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
1850
        goto end;
×
1851
      }
1852

1853
      // Query vgroup info for first table to get endpoint
1854
      SUpdateTableTagVal* pFirstTable = taosArrayGet(pTables, 0);
2,927✔
1855
      SVgroupInfo vgInfo = {0};
2,927✔
1856
      SName pName = {0};
2,927✔
1857
      toName(pTscObj->acctId, pRequest->pDb, pFirstTable->tbName, &pName);
2,927✔
1858
      code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgInfo);
2,927✔
1859
      if (code != TSDB_CODE_SUCCESS) {
2,927✔
1860
        taosMemoryFree(pMsg);
×
1861
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
1862
        goto end;
×
1863
      }
1864

1865
      pVgData->vg = vgInfo;
2,927✔
1866
      pVgData->pData = pMsg;
2,927✔
1867
      pVgData->size = tlen;
2,927✔
1868
      pVgData->numOfTables = nTablesInVg;
2,927✔
1869

1870
      if (taosArrayPush(pArray, &pVgData) == NULL) {
2,927✔
1871
        code = terrno;
×
1872
        taosMemoryFree(pMsg);
×
1873
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
1874
        goto end;
×
1875
      }
1876

1877
      pVgData = NULL;  // Ownership transferred to pArray
2,927✔
1878
      pIter = taosHashIterate(pVgroupHashmap, pIter);
2,927✔
1879
    }
1880

1881
    uInfo(LOG_ID_TAG " Type 1 batch alter: grouped %d tables into %d vnodes",
2,560✔
1882
          LOG_ID_VALUE, nTables, (int32_t)taosArrayGetSize(pArray));
1883
  } else if (req.action == TSDB_ALTER_TABLE_UPDATE_CHILD_TABLE_TAG_VAL) {
3,668✔
1884
    char dbFName[TSDB_DB_FNAME_LEN] = {0};
734✔
1885
    SName pName = {TSDB_TABLE_NAME_T, pTscObj->acctId, {0}, {0}};
734✔
1886
    tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
734✔
1887
    (void)tNameGetFullDbName(&pName, dbFName);
734✔
1888
    RAW_RETURN_CHECK(catalogGetDBVgList(pCatalog, &conn, dbFName, &pVgList));
734✔
1889

1890
    pArray = taosArrayInit(taosArrayGetSize(pVgList), sizeof(void*));
734✔
1891
    RAW_NULL_CHECK(pArray);
734✔
1892
    for (int i = 0; i < taosArrayGetSize(pVgList); ++i) {
2,202✔
1893
      SVgroupInfo* pInfo = (SVgroupInfo*)taosArrayGet(pVgList, i);
1,468✔
1894
      pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
1,468✔
1895
      RAW_NULL_CHECK(pVgData);
1,468✔
1896
      pVgData->vg = *pInfo;
1,468✔
1897

1898
      int tlen = 0;
1,468✔
1899
      req.source = TD_REQ_FROM_TAOX;
1,468✔
1900

1901
      tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code);
1,468✔
1902
      RAW_RETURN_CHECK(code);
1,468✔
1903
      tlen += sizeof(SMsgHead);
1,468✔
1904
      void* pMsg = taosMemoryMalloc(tlen);
1,468✔
1905
      RAW_NULL_CHECK(pMsg);
1,468✔
1906
      ((SMsgHead*)pMsg)->vgId = htonl(pInfo->vgId);
1,468✔
1907
      ((SMsgHead*)pMsg)->contLen = htonl(tlen);
1,468✔
1908
      void* pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
1,468✔
1909
      tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
1,468✔
1910
      RAW_RETURN_CHECK(tEncodeSVAlterTbReq(&coder, &req));
1,468✔
1911
      tEncoderClear(&coder);
1,468✔
1912

1913
      pVgData->pData = pMsg;
1,468✔
1914
      pVgData->size = tlen;
1,468✔
1915

1916
      pVgData->numOfTables = 1;
1,468✔
1917
      RAW_NULL_CHECK(taosArrayPush(pArray, &pVgData));
1,468✔
1918
      pVgData = NULL;  // Ownership transferred to pArray
1,468✔
1919
    }
1920
  } else {
1921
    // Single table or Type 2 modification - original logic
1922
    SVgroupInfo pInfo = {0};
2,934✔
1923
    SName       pName = {0};
2,934✔
1924
    toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
2,934✔
1925
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
2,934✔
1926
    pArray = taosArrayInit(1, sizeof(void*));
2,934✔
1927
    RAW_NULL_CHECK(pArray);
2,934✔
1928

1929
    pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
2,934✔
1930
    RAW_NULL_CHECK(pVgData);
2,934✔
1931
    pVgData->vg = pInfo;
2,934✔
1932

1933
    int tlen = 0;
2,934✔
1934
    req.source = TD_REQ_FROM_TAOX;
2,934✔
1935

1936
    if (strlen(tmqWriteRefDB) > 0) {
2,934✔
1937
      req.refDbName = tmqWriteRefDB;
2,934✔
1938
    }
1939

1940
    if (req.action == TSDB_ALTER_TABLE_ALTER_COLUMN_REF && tmqWriteCheckRef) {
2,934✔
1941
      RAW_RETURN_CHECK(checkColRefForAlter(pCatalog, &conn, pTscObj->acctId, req.refDbName, req.refTbName, req.refColName,
366✔
1942
        pRequest->pDb, req.tbName, req.colName));
1943
    }else if (req.action == TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COLUMN_REF && tmqWriteCheckRef) {
2,568✔
1944
      RAW_RETURN_CHECK(checkColRefForAdd(pCatalog, &conn, pTscObj->acctId, req.refDbName, req.refTbName, req.refColName,
×
1945
        pRequest->pDb, req.tbName, req.colName, req.type, req.bytes));
1946
    }
1947

1948
    tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code);
2,934✔
1949
    RAW_RETURN_CHECK(code);
2,934✔
1950
    tlen += sizeof(SMsgHead);
2,934✔
1951
    void* pMsg = taosMemoryMalloc(tlen);
2,934✔
1952
    RAW_NULL_CHECK(pMsg);
2,934✔
1953
    ((SMsgHead*)pMsg)->vgId = htonl(pInfo.vgId);
2,934✔
1954
    ((SMsgHead*)pMsg)->contLen = htonl(tlen);
2,934✔
1955
    void* pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
2,934✔
1956
    tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
2,934✔
1957
    RAW_RETURN_CHECK(tEncodeSVAlterTbReq(&coder, &req));
2,934✔
1958

1959
    pVgData->pData = pMsg;
2,934✔
1960
    pVgData->size = tlen;
2,934✔
1961

1962
    pVgData->numOfTables = 1;
2,934✔
1963
    RAW_NULL_CHECK(taosArrayPush(pArray, &pVgData));
2,934✔
1964
    pVgData = NULL;
2,934✔
1965
  }
1966

1967
  pQuery = NULL;
6,228✔
1968
  RAW_RETURN_CHECK(nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery));
6,228✔
1969
  if (NULL == pQuery) goto end;
6,228✔
1970
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
6,228✔
1971
  pQuery->msgType = TDMT_VND_ALTER_TABLE;
6,228✔
1972
  pQuery->stableQuery = false;
6,228✔
1973
  pQuery->pRoot = NULL;
6,228✔
1974
  RAW_RETURN_CHECK(nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT, &pQuery->pRoot));
6,228✔
1975
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pArray));
6,228✔
1976

1977
  launchQueryImpl(pRequest, pQuery, true, NULL);
6,228✔
1978
  pArray = NULL;
6,228✔
1979

1980
  code = pRequest->code;
6,228✔
1981
  if (code == TSDB_CODE_TDB_TABLE_NOT_EXIST || code == TSDB_CODE_NOT_FOUND) {
6,228✔
1982
    code = TSDB_CODE_SUCCESS;
×
1983
  }
1984

1985
  if (pRequest->code == TSDB_CODE_SUCCESS) {
6,228✔
1986
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
6,228✔
1987
    if (pRes->res != NULL) {
6,228✔
1988
      code = handleAlterTbExecRes(pRes->res, pCatalog);
2,934✔
1989
    }
1990
  }
1991
  uDebug(LOG_ID_TAG " alter table return, meta:%p, len:%d, msg:%s", LOG_ID_VALUE, meta, metaLen, tstrerror(code));
6,228✔
1992

1993
end:
6,228✔
1994
  // Cleanup vnode grouping hashmap
1995
  if (pVgroupHashmap != NULL) {
6,228✔
1996
    void* pIter = taosHashIterate(pVgroupHashmap, NULL);
2,560✔
1997
    while (pIter) {
5,487✔
1998
      SArray* pTables = *(SArray**)pIter;
2,927✔
1999
      taosArrayDestroy(pTables);
2,927✔
2000
      pIter = taosHashIterate(pVgroupHashmap, pIter);
2,927✔
2001
    }
2002
    taosHashCleanup(pVgroupHashmap);
2,560✔
2003
  }
2004

2005
  for (int i = 0; i < taosArrayGetSize(pArray); ++i) {
6,228✔
2006
    SVgDataBlocks* pData = (SVgDataBlocks*)taosArrayGetP(pArray, i);
×
2007
    if (pData && pData->pData) {
×
2008
      taosMemoryFreeClear(pData->pData);
×
2009
      taosMemoryFreeClear(pData);
×
2010
    }
2011
  }
2012
  taosArrayDestroy(pArray);
6,228✔
2013
  
2014
  if (pVgData) {
6,228✔
2015
    taosMemoryFreeClear(pVgData->pData);
×
2016
    taosMemoryFreeClear(pVgData);
×
2017
  }
2018
  taosArrayDestroy(pVgList);
6,228✔
2019
  destroyRequest(pRequest);
6,228✔
2020
  tDecoderClear(&dcoder);
6,228✔
2021
  qDestroyQuery(pQuery);
6,228✔
2022
  destroyAlterTbReq(&req);
6,228✔
2023
  tEncoderClear(&coder);
6,228✔
2024
  RAW_LOG_END
6,228✔
2025
  return code;
6,228✔
2026
}
2027

2028
int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD* fields,
297✔
2029
                                     int numFields) {
2030
  return taos_write_raw_block_with_fields_with_reqid(taos, rows, pData, tbname, fields, numFields, 0);
297✔
2031
}
2032

2033
int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname,
297✔
2034
                                                TAOS_FIELD* fields, int numFields, int64_t reqid) {
2035
  if (taos == NULL || pData == NULL || tbname == NULL) {
297✔
2036
    uError("invalid parameter in %s", __func__);
×
2037
    return TSDB_CODE_INVALID_PARA;
×
2038
  }
2039
  int32_t     code = TSDB_CODE_SUCCESS;
297✔
2040
  int32_t     lino = 0;
297✔
2041
  STableMeta* pTableMeta = NULL;
297✔
2042
  SQuery*     pQuery = NULL;
297✔
2043
  SHashObj*   pVgHash = NULL;
297✔
2044

2045
  SRequestObj* pRequest = NULL;
297✔
2046
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
297✔
2047

2048
  uDebug(LOG_ID_TAG " write raw block with field, rows:%d, pData:%p, tbname:%s, fields:%p, numFields:%d", LOG_ID_VALUE,
297✔
2049
         rows, pData, tbname, fields, numFields);
2050

2051
  pRequest->syncQuery = true;
297✔
2052
  if (!pRequest->pDb) {
297✔
2053
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
2054
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
2055
    goto end;
×
2056
  }
2057

2058
  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
297✔
2059
  tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
297✔
2060
  tstrncpy(pName.tname, tbname, sizeof(pName.tname));
297✔
2061

2062
  struct SCatalog* pCatalog = NULL;
297✔
2063
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
297✔
2064

2065
  SRequestConnInfo conn = {0};
297✔
2066
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
297✔
2067
  conn.requestId = pRequest->requestId;
297✔
2068
  conn.requestObjRefId = pRequest->self;
297✔
2069
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
297✔
2070

2071
  SVgroupInfo vgData = {0};
297✔
2072
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData));
297✔
2073
  RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
297✔
2074
  RAW_RETURN_CHECK(smlInitHandle(&pQuery));
297✔
2075
  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
297✔
2076
  RAW_NULL_CHECK(pVgHash);
297✔
2077
  RAW_RETURN_CHECK(
297✔
2078
      taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
2079
  RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0, false));
297✔
2080
  RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
297✔
2081

2082
  launchQueryImpl(pRequest, pQuery, true, NULL);
297✔
2083
  code = pRequest->code;
297✔
2084
  uDebug(LOG_ID_TAG " write raw block with field return, msg:%s", LOG_ID_VALUE, tstrerror(code));
297✔
2085

2086
end:
297✔
2087
  taosMemoryFreeClear(pTableMeta);
297✔
2088
  qDestroyQuery(pQuery);
297✔
2089
  destroyRequest(pRequest);
297✔
2090
  taosHashCleanup(pVgHash);
297✔
2091
  RAW_LOG_END
297✔
2092
  return code;
297✔
2093
}
2094

2095
int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) {
2,079✔
2096
  return taos_write_raw_block_with_reqid(taos, rows, pData, tbname, 0);
2,079✔
2097
}
2098

2099
int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname, int64_t reqid) {
2,079✔
2100
  if (taos == NULL || pData == NULL || tbname == NULL) {
2,079✔
2101
    return TSDB_CODE_INVALID_PARA;
×
2102
  }
2103
  int32_t     code = TSDB_CODE_SUCCESS;
2,079✔
2104
  int32_t     lino = 0;
2,079✔
2105
  STableMeta* pTableMeta = NULL;
2,079✔
2106
  SQuery*     pQuery = NULL;
2,079✔
2107
  SHashObj*   pVgHash = NULL;
2,079✔
2108

2109
  SRequestObj* pRequest = NULL;
2,079✔
2110
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
2,079✔
2111

2112
  uDebug(LOG_ID_TAG " write raw block, rows:%d, pData:%p, tbname:%s", LOG_ID_VALUE, rows, pData, tbname);
2,079✔
2113

2114
  pRequest->syncQuery = true;
2,079✔
2115
  if (!pRequest->pDb) {
2,079✔
2116
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
2117
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
2118
    goto end;
×
2119
  }
2120

2121
  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
2,079✔
2122
  tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
2,079✔
2123
  tstrncpy(pName.tname, tbname, sizeof(pName.tname));
2,079✔
2124

2125
  struct SCatalog* pCatalog = NULL;
2,079✔
2126
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
2,079✔
2127

2128
  SRequestConnInfo conn = {0};
2,079✔
2129
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
2,079✔
2130
  conn.requestId = pRequest->requestId;
2,079✔
2131
  conn.requestObjRefId = pRequest->self;
2,079✔
2132
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
2,079✔
2133

2134
  SVgroupInfo vgData = {0};
2,079✔
2135
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData));
2,079✔
2136
  RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
2,079✔
2137
  RAW_RETURN_CHECK(smlInitHandle(&pQuery));
1,782✔
2138
  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
1,782✔
2139
  RAW_NULL_CHECK(pVgHash);
1,782✔
2140
  RAW_RETURN_CHECK(
1,782✔
2141
      taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
2142
  RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false, NULL, 0, false));
1,782✔
2143
  RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
1,188✔
2144

2145
  launchQueryImpl(pRequest, pQuery, true, NULL);
1,188✔
2146
  code = pRequest->code;
1,188✔
2147
  uDebug(LOG_ID_TAG " write raw block return, msg:%s", LOG_ID_VALUE, tstrerror(code));
1,188✔
2148

2149
end:
2,079✔
2150
  taosMemoryFreeClear(pTableMeta);
2,079✔
2151
  qDestroyQuery(pQuery);
2,079✔
2152
  destroyRequest(pRequest);
2,079✔
2153
  taosHashCleanup(pVgHash);
2,079✔
2154
  RAW_LOG_END
2,079✔
2155
  return code;
2,079✔
2156
}
2157

2158
static void* getRawDataFromRes(void* pRetrieve) {
7,962✔
2159
  if (pRetrieve == NULL) {
7,962✔
2160
    uError("invalid parameter in %s", __func__);
×
2161
    return NULL;
×
2162
  }
2163
  void* rawData = NULL;
7,962✔
2164
  // deal with compatibility
2165
  if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_VERSION) {
7,962✔
2166
    rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
×
2167
  } else if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_VERSION ||
7,962✔
2168
             *(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION) {
×
2169
    rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
7,962✔
2170
  }
2171
  return rawData;
7,962✔
2172
}
2173

2174
static int32_t buildCreateTbMap(SMqDataRsp* rsp, SHashObj* pHashObj) {
367✔
2175
  if (rsp == NULL || pHashObj == NULL) {
367✔
2176
    uError("invalid parameter in %s", __func__);
×
2177
    return TSDB_CODE_INVALID_PARA;
×
2178
  }
2179
  // find schema data info
2180
  int32_t       code = 0;
367✔
2181
  int32_t       lino = 0;
367✔
2182
  SVCreateTbReq pCreateReq = {0};
367✔
2183
  SDecoder      decoderTmp = {0};
367✔
2184
  RAW_LOG_START
367✔
2185
  for (int j = 0; j < rsp->createTableNum; j++) {
1,101✔
2186
    void** dataTmp = taosArrayGet(rsp->createTableReq, j);
734✔
2187
    RAW_NULL_CHECK(dataTmp);
734✔
2188
    int32_t* lenTmp = taosArrayGet(rsp->createTableLen, j);
734✔
2189
    RAW_NULL_CHECK(lenTmp);
734✔
2190

2191
    tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
734✔
2192
    RAW_RETURN_CHECK(tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq));
734✔
2193

2194
    if (pCreateReq.type != TSDB_CHILD_TABLE) {
734✔
2195
      uError("invalid table type %d in %s", pCreateReq.type, __func__);
×
2196
      code = TSDB_CODE_INVALID_MSG;
×
2197
      goto end;
×
2198
    }
2199
    if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL) {
734✔
2200
      RAW_RETURN_CHECK(
734✔
2201
          taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq)));
2202
    } else {
2203
      tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
×
2204
    }
2205

2206
    tDecoderClear(&decoderTmp);
734✔
2207
    pCreateReq = (SVCreateTbReq){0};
734✔
2208
  }
2209

2210
end:
367✔
2211
  tDecoderClear(&decoderTmp);
367✔
2212
  tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
367✔
2213
  RAW_LOG_END
367✔
2214
  return code;
367✔
2215
}
2216

2217
typedef enum {
2218
  WRITE_RAW_INIT_START = 0,
2219
  WRITE_RAW_INIT_OK,
2220
  WRITE_RAW_INIT_FAIL,
2221
} WRITE_RAW_INIT_STATUS;
2222

2223
static SHashObj* writeRawCache = NULL;
2224
static int8_t    initFlag = 0;
2225
static int8_t    initedFlag = WRITE_RAW_INIT_START;
2226

2227
typedef struct {
2228
  SHashObj* pVgHash;
2229
  SHashObj* pNameHash;
2230
  SHashObj* pMetaHash;
2231
} rawCacheInfo;
2232

2233
typedef struct {
2234
  SVgroupInfo vgInfo;
2235
  int64_t     uid;
2236
  int64_t     suid;
2237
} tbInfo;
2238

2239
static void tmqFreeMeta(void* data) {
726✔
2240
  if (data == NULL) {
726✔
2241
    uError("invalid parameter in %s", __func__);
×
2242
    return;
×
2243
  }
2244
  STableMeta* pTableMeta = *(STableMeta**)data;
726✔
2245
  taosMemoryFree(pTableMeta);
726✔
2246
}
2247

2248
static void freeRawCache(void* data) {
×
2249
  if (data == NULL) {
×
2250
    uError("invalid parameter in %s", __func__);
×
2251
    return;
×
2252
  }
2253
  rawCacheInfo* pRawCache = (rawCacheInfo*)data;
×
2254
  taosHashCleanup(pRawCache->pMetaHash);
×
2255
  taosHashCleanup(pRawCache->pNameHash);
×
2256
  taosHashCleanup(pRawCache->pVgHash);
×
2257
}
2258

2259
static int32_t initRawCacheHash() {
2,885✔
2260
  if (writeRawCache == NULL) {
2,885✔
2261
    writeRawCache = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
2,885✔
2262
    if (writeRawCache == NULL) {
2,885✔
2263
      return terrno;
×
2264
    }
2265
    taosHashSetFreeFp(writeRawCache, freeRawCache);
2,885✔
2266
  }
2267
  return 0;
2,885✔
2268
}
2269

2270
static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW) {
×
2271
  if (rawData == NULL || pSW == NULL) {
×
2272
    return false;
×
2273
  }
2274
  if (pTableMeta == NULL) {
×
2275
    uError("invalid parameter in %s", __func__);
×
2276
    return false;
×
2277
  }
2278
  char* p = (char*)rawData;
×
2279
  // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
2280
  // column length |
2281
  p += sizeof(int32_t);
×
2282
  p += sizeof(int32_t);
×
2283
  p += sizeof(int32_t);
×
2284
  p += sizeof(int32_t);
×
2285
  p += sizeof(int32_t);
×
2286
  p += sizeof(uint64_t);
×
2287
  int8_t* fields = p;
×
2288

2289
  if (pSW->nCols != pTableMeta->tableInfo.numOfColumns) {
×
2290
    return true;
×
2291
  }
2292

2293
  for (int i = 0; i < pSW->nCols; i++) {
×
2294
    int j = 0;
×
2295
    for (; j < pTableMeta->tableInfo.numOfColumns; j++) {
×
2296
      SSchema*    pColSchema = &pTableMeta->schema[j];
×
2297
      SSchemaExt* pColExtSchema = &pTableMeta->schemaExt[j];
×
2298
      char*       fieldName = pSW->pSchema[i].name;
×
2299

2300
      if (strcmp(pColSchema->name, fieldName) == 0) {
×
2301
        if (checkSchema(pColSchema, pColExtSchema, fields, NULL, 0) != 0) {
×
2302
          return true;
×
2303
        }
2304
        break;
×
2305
      }
2306
    }
2307
    fields += sizeof(int8_t) + sizeof(int32_t);
×
2308

2309
    if (j == pTableMeta->tableInfo.numOfColumns) return true;
×
2310
  }
2311
  return false;
×
2312
}
2313

2314
static int32_t getRawCache(SHashObj** pVgHash, SHashObj** pNameHash, SHashObj** pMetaHash, void* key) {
6,151✔
2315
  if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || key == NULL) {
6,151✔
2316
    uError("invalid parameter in %s", __func__);
×
2317
    return TSDB_CODE_INVALID_PARA;
×
2318
  }
2319
  int32_t code = 0;
6,151✔
2320
  int32_t lino = 0;
6,151✔
2321
  RAW_LOG_START
6,151✔
2322
  void* cacheInfo = taosHashGet(writeRawCache, &key, POINTER_BYTES);
6,151✔
2323
  if (cacheInfo == NULL) {
6,151✔
2324
    *pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
6,151✔
2325
    RAW_NULL_CHECK(*pVgHash);
6,151✔
2326
    *pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
6,151✔
2327
    RAW_NULL_CHECK(*pNameHash);
6,151✔
2328
    *pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
6,151✔
2329
    RAW_NULL_CHECK(*pMetaHash);
6,151✔
2330
    taosHashSetFreeFp(*pMetaHash, tmqFreeMeta);
6,151✔
2331
    rawCacheInfo info = {*pVgHash, *pNameHash, *pMetaHash};
6,151✔
2332
    RAW_RETURN_CHECK(taosHashPut(writeRawCache, &key, POINTER_BYTES, &info, sizeof(rawCacheInfo)));
6,151✔
2333
  } else {
2334
    rawCacheInfo* info = (rawCacheInfo*)cacheInfo;
×
2335
    *pVgHash = info->pVgHash;
×
2336
    *pNameHash = info->pNameHash;
×
2337
    *pMetaHash = info->pMetaHash;
×
2338
  }
2339

2340
end:
6,151✔
2341
  if (code != 0) {
6,151✔
2342
    taosHashCleanup(*pMetaHash);
×
2343
    taosHashCleanup(*pNameHash);
×
2344
    taosHashCleanup(*pVgHash);
×
2345
  }
2346
  RAW_LOG_END
6,151✔
2347
  return code;
6,151✔
2348
}
2349

2350
static int32_t buildRawRequest(TAOS* taos, SRequestObj** pRequest, SCatalog** pCatalog, SRequestConnInfo* conn) {
6,151✔
2351
  if (taos == NULL || pRequest == NULL || pCatalog == NULL || conn == NULL) {
6,151✔
2352
    uError("invalid parameter in %s", __func__);
×
2353
    return TSDB_CODE_INVALID_PARA;
×
2354
  }
2355
  int32_t code = 0;
6,151✔
2356
  int32_t lino = 0;
6,151✔
2357
  RAW_LOG_START
6,151✔
2358
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, pRequest, 0));
6,151✔
2359
  (*pRequest)->syncQuery = true;
6,151✔
2360
  if (!(*pRequest)->pDb) {
6,151✔
2361
    uError("%s no database selected", __func__);
×
2362
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
2363
    goto end;
×
2364
  }
2365

2366
  RAW_RETURN_CHECK(catalogGetHandle((*pRequest)->pTscObj->pAppInfo->clusterId, pCatalog));
6,151✔
2367
  conn->pTrans = (*pRequest)->pTscObj->pAppInfo->pTransporter;
6,151✔
2368
  conn->requestId = (*pRequest)->requestId;
6,151✔
2369
  conn->requestObjRefId = (*pRequest)->self;
6,151✔
2370
  conn->mgmtEps = getEpSet_s(&(*pRequest)->pTscObj->pAppInfo->mgmtEp);
6,151✔
2371

2372
end:
6,151✔
2373
  RAW_LOG_END
6,151✔
2374
  return code;
6,151✔
2375
}
2376

2377
typedef int32_t _raw_decode_func_(SDecoder* pDecoder, SMqDataRsp* pRsp);
2378
static int32_t  decodeRawData(SDecoder* decoder, void* data, uint32_t dataLen, _raw_decode_func_ func,
6,151✔
2379
                              SMqRspObj* rspObj) {
2380
  if (decoder == NULL || data == NULL || func == NULL || rspObj == NULL) {
6,151✔
2381
    uError("invalid parameter in %s", __func__);
×
2382
    return TSDB_CODE_INVALID_PARA;
×
2383
  }
2384
  int8_t dataVersion = *(int8_t*)data;
6,151✔
2385
  if (dataVersion >= MQ_DATA_RSP_VERSION) {
6,151✔
2386
    data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
6,151✔
2387
    if (dataLen < sizeof(int8_t) + sizeof(int32_t)) {
6,151✔
2388
      return TSDB_CODE_INVALID_PARA;
×
2389
    }
2390
    dataLen -= sizeof(int8_t) + sizeof(int32_t);
6,151✔
2391
  }
2392

2393
  rspObj->resIter = -1;
6,151✔
2394
  tDecoderInit(decoder, data, dataLen);
6,151✔
2395
  int32_t code = func(decoder, &rspObj->dataRsp);
6,151✔
2396
  if (code != 0) {
6,151✔
2397
    SET_ERROR_MSG("decode mq taosx data rsp failed");
×
2398
  }
2399
  return code;
6,151✔
2400
}
2401

2402
static int32_t processCacheMeta(SHashObj* pVgHash, SHashObj* pNameHash, SHashObj* pMetaHash,
7,962✔
2403
                                SVCreateTbReq* pCreateReqDst, SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName,
2404
                                STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry) {
2405
  if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || pCatalog == NULL || conn == NULL || pName == NULL ||
7,962✔
2406
      pMeta == NULL) {
2407
    uError("invalid parameter in %s", __func__);
×
2408
    return TSDB_CODE_INVALID_PARA;
×
2409
  }
2410
  int32_t code = 0;
7,962✔
2411
  int32_t lino = 0;
7,962✔
2412
  RAW_LOG_START
7,962✔
2413
  STableMeta* pTableMeta = NULL;
7,962✔
2414
  tbInfo*     tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
7,962✔
2415
  if (tmpInfo == NULL || retry > 0) {
7,962✔
2416
    tbInfo info = {0};
7,962✔
2417

2418
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, conn, pName, &info.vgInfo));
7,962✔
2419
    if (pCreateReqDst && tmpInfo == NULL) {  // change stable name to get meta
7,962✔
2420
      tstrncpy(pName->tname, pCreateReqDst->ctb.stbName, TSDB_TABLE_NAME_LEN);
734✔
2421
    }
2422
    RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
7,962✔
2423
    info.uid = pTableMeta->uid;
7,962✔
2424
    if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
7,962✔
2425
      info.suid = pTableMeta->suid;
5,776✔
2426
    } else {
2427
      info.suid = pTableMeta->uid;
2,186✔
2428
    }
2429
    code = taosHashPut(pMetaHash, &info.suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
7,962✔
2430
    RAW_RETURN_CHECK(code);
7,962✔
2431

2432
    uDebug("put table meta to hash1, suid:%" PRId64 ", metaHashSIze:%d, nameHashSize:%d, vgHashSize:%d", info.suid,
7,962✔
2433
           taosHashGetSize(pMetaHash), taosHashGetSize(pNameHash), taosHashGetSize(pVgHash));
2434
    if (pCreateReqDst) {
7,962✔
2435
      pTableMeta->vgId = info.vgInfo.vgId;
734✔
2436
      pTableMeta->uid = pCreateReqDst->uid;
734✔
2437
      pCreateReqDst->ctb.suid = pTableMeta->suid;
734✔
2438
    }
2439

2440
    RAW_RETURN_CHECK(taosHashPut(pNameHash, pName->tname, strlen(pName->tname), &info, sizeof(tbInfo)));
7,962✔
2441
    tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
7,962✔
2442
    RAW_RETURN_CHECK(
7,962✔
2443
        taosHashPut(pVgHash, &info.vgInfo.vgId, sizeof(info.vgInfo.vgId), &info.vgInfo, sizeof(SVgroupInfo)));
2444
  }
2445

2446
  if (pTableMeta == NULL || retry > 0) {
7,962✔
2447
    STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, &tmpInfo->suid, LONG_BYTES);
×
2448
    if (pTableMetaTmp == NULL || retry > 0 || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) {
×
2449
      RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
×
2450
      code = taosHashPut(pMetaHash, &tmpInfo->suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
×
2451
      RAW_RETURN_CHECK(code);
×
2452
      uDebug("put table meta to hash2, suid:%" PRId64 ", metaHashSIze:%d, nameHashSize:%d, vgHashSize:%d",
×
2453
             tmpInfo->suid, taosHashGetSize(pMetaHash), taosHashGetSize(pNameHash), taosHashGetSize(pVgHash));
2454
    } else {
2455
      pTableMeta = *pTableMetaTmp;
×
2456
      pTableMeta->uid = tmpInfo->uid;
×
2457
      pTableMeta->vgId = tmpInfo->vgInfo.vgId;
×
2458
    }
2459
  }
2460
  *pMeta = pTableMeta;
7,962✔
2461
  pTableMeta = NULL;
7,962✔
2462

2463
end:
7,962✔
2464
  taosMemoryFree(pTableMeta);
7,962✔
2465
  RAW_LOG_END
7,962✔
2466
  return code;
7,962✔
2467
}
2468

2469
static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
5,784✔
2470
  if (taos == NULL || data == NULL) {
5,784✔
2471
    uError("invalid parameter in %s", __func__);
×
2472
    return TSDB_CODE_INVALID_PARA;
×
2473
  }
2474
  int32_t   code = TSDB_CODE_SUCCESS;
5,784✔
2475
  int32_t   lino = 0;
5,784✔
2476
  SQuery*   pQuery = NULL;
5,784✔
2477
  SMqRspObj rspObj = {0};
5,784✔
2478
  SDecoder  decoder = {0};
5,784✔
2479

2480
  SRequestObj*     pRequest = NULL;
5,784✔
2481
  SCatalog*        pCatalog = NULL;
5,784✔
2482
  SRequestConnInfo conn = {0};
5,784✔
2483
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
5,784✔
2484
  uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
5,784✔
2485
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
5,784✔
2486

2487
  SHashObj* pVgHash = NULL;
5,784✔
2488
  SHashObj* pNameHash = NULL;
5,784✔
2489
  SHashObj* pMetaHash = NULL;
5,784✔
2490
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
5,784✔
2491
  int retry = 0;
5,784✔
2492
  while (1) {
2493
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
5,784✔
2494
    uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
5,784✔
2495
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
13,012✔
2496
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
7,228✔
2497
      RAW_NULL_CHECK(tbName);
7,228✔
2498
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
7,228✔
2499
      RAW_NULL_CHECK(pSW);
7,228✔
2500
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
7,228✔
2501
      RAW_NULL_CHECK(pRetrieve);
7,228✔
2502
      void* rawData = getRawDataFromRes(pRetrieve);
7,228✔
2503
      RAW_NULL_CHECK(rawData);
7,228✔
2504

2505
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
7,228✔
2506
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
7,228✔
2507
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
7,228✔
2508
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
7,228✔
2509

2510
      STableMeta* pTableMeta = NULL;
7,228✔
2511
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, &pTableMeta, pSW,
7,228✔
2512
                                        rawData, retry));
2513
      char err[ERR_MSG_LEN] = {0};
7,228✔
2514
      code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
7,228✔
2515
      if (code != TSDB_CODE_SUCCESS) {
7,228✔
2516
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
2517
        goto end;
×
2518
      }
2519
    }
2520
    RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
5,784✔
2521
    launchQueryImpl(pRequest, pQuery, true, NULL);
5,784✔
2522
    code = pRequest->code;
5,784✔
2523

2524
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
5,784✔
2525
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
2526
      qDestroyQuery(pQuery);
×
2527
      pQuery = NULL;
×
2528
      rspObj.resIter = -1;
×
2529
      continue;
×
2530
    }
2531
    break;
5,784✔
2532
  }
2533
  uDebug(LOG_ID_TAG " write raw data return, msg:%s", LOG_ID_VALUE, tstrerror(code));
5,784✔
2534

2535
end:
5,784✔
2536
  tDeleteMqDataRsp(&rspObj.dataRsp);
5,784✔
2537
  tDecoderClear(&decoder);
5,784✔
2538
  qDestroyQuery(pQuery);
5,784✔
2539
  destroyRequest(pRequest);
5,784✔
2540
  RAW_LOG_END
5,784✔
2541
  return code;
5,784✔
2542
}
2543

2544
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
367✔
2545
  if (taos == NULL || data == NULL) {
367✔
2546
    uError("invalid parameter in %s", __func__);
×
2547
    return TSDB_CODE_INVALID_PARA;
×
2548
  }
2549
  int32_t   code = TSDB_CODE_SUCCESS;
367✔
2550
  int32_t   lino = 0;
367✔
2551
  SQuery*   pQuery = NULL;
367✔
2552
  SMqRspObj rspObj = {0};
367✔
2553
  SDecoder  decoder = {0};
367✔
2554
  SHashObj* pCreateTbHash = NULL;
367✔
2555

2556
  SRequestObj*     pRequest = NULL;
367✔
2557
  SCatalog*        pCatalog = NULL;
367✔
2558
  SRequestConnInfo conn = {0};
367✔
2559

2560
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
367✔
2561
  uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
367✔
2562
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeSTaosxRsp, &rspObj));
367✔
2563

2564
  pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
367✔
2565
  RAW_NULL_CHECK(pCreateTbHash);
367✔
2566
  RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash));
367✔
2567

2568
  SHashObj* pVgHash = NULL;
367✔
2569
  SHashObj* pNameHash = NULL;
367✔
2570
  SHashObj* pMetaHash = NULL;
367✔
2571
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
367✔
2572
  int retry = 0;
367✔
2573
  while (1) {
2574
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
367✔
2575
    uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
367✔
2576
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
1,101✔
2577
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
734✔
2578
      RAW_NULL_CHECK(tbName);
734✔
2579
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
734✔
2580
      RAW_NULL_CHECK(pSW);
734✔
2581
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
734✔
2582
      RAW_NULL_CHECK(pRetrieve);
734✔
2583
      void* rawData = getRawDataFromRes(pRetrieve);
734✔
2584
      RAW_NULL_CHECK(rawData);
734✔
2585

2586
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
734✔
2587
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
734✔
2588
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
734✔
2589
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
734✔
2590

2591
      // find schema data info
2592
      SVCreateTbReq* pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, pName.tname, strlen(pName.tname));
734✔
2593
      STableMeta*    pTableMeta = NULL;
734✔
2594
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, pCreateReqDst, pCatalog, &conn, &pName,
734✔
2595
                                        &pTableMeta, pSW, rawData, retry));
2596
      char err[ERR_MSG_LEN] = {0};
734✔
2597
      code =
2598
          rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
734✔
2599
      if (code != TSDB_CODE_SUCCESS) {
734✔
2600
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
2601
        goto end;
×
2602
      }
2603
    }
2604
    RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
367✔
2605
    launchQueryImpl(pRequest, pQuery, true, NULL);
367✔
2606
    code = pRequest->code;
367✔
2607

2608
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
367✔
2609
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
2610
      qDestroyQuery(pQuery);
×
2611
      pQuery = NULL;
×
2612
      rspObj.resIter = -1;
×
2613
      continue;
×
2614
    }
2615
    break;
367✔
2616
  }
2617
  uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
367✔
2618

2619
end:
367✔
2620
  tDeleteSTaosxRsp(&rspObj.dataRsp);
367✔
2621
  void* pIter = taosHashIterate(pCreateTbHash, NULL);
367✔
2622
  while (pIter) {
1,101✔
2623
    tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE);
734✔
2624
    pIter = taosHashIterate(pCreateTbHash, pIter);
734✔
2625
  }
2626
  taosHashCleanup(pCreateTbHash);
367✔
2627
  tDecoderClear(&decoder);
367✔
2628
  qDestroyQuery(pQuery);
367✔
2629
  destroyRequest(pRequest);
367✔
2630
  RAW_LOG_END
367✔
2631
  return code;
367✔
2632
}
2633

2634
static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
×
2635
  if (taos == NULL || data == NULL) {
×
2636
    uError("invalid parameter in %s", __func__);
×
2637
    return TSDB_CODE_INVALID_PARA;
×
2638
  }
2639
  int32_t   code = TSDB_CODE_SUCCESS;
×
2640
  int32_t   lino = 0;
×
2641
  SQuery*   pQuery = NULL;
×
2642
  SHashObj* pVgroupHash = NULL;
×
2643
  SMqRspObj rspObj = {0};
×
2644
  SDecoder  decoder = {0};
×
2645

2646
  SRequestObj*     pRequest = NULL;
×
2647
  SCatalog*        pCatalog = NULL;
×
2648
  SRequestConnInfo conn = {0};
×
2649

2650
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
×
2651
  uDebug(LOG_ID_TAG " write raw rawdata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
×
2652
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
×
2653

2654
  SHashObj* pVgHash = NULL;
×
2655
  SHashObj* pNameHash = NULL;
×
2656
  SHashObj* pMetaHash = NULL;
×
2657
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
×
2658
  int retry = 0;
×
2659
  while (1) {
×
2660
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
×
2661
    uDebug(LOG_ID_TAG " write raw rawdata block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
×
2662
    SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(pQuery)->pRoot;
×
2663
    pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
×
2664
    RAW_NULL_CHECK(pVgroupHash);
×
2665
    pStmt->pVgDataBlocks = taosArrayInit(8, POINTER_BYTES);
×
2666
    RAW_NULL_CHECK(pStmt->pVgDataBlocks);
×
2667

2668
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
×
2669
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
×
2670
      RAW_NULL_CHECK(tbName);
×
2671
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
×
2672
      RAW_NULL_CHECK(pRetrieve);
×
2673
      void* rawData = getRawDataFromRes(pRetrieve);
×
2674
      RAW_NULL_CHECK(rawData);
×
2675

2676
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
×
2677
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
×
2678
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
×
2679
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
×
2680

2681
      // find schema data info
2682
      STableMeta* pTableMeta = NULL;
×
2683
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, &pTableMeta, NULL,
×
2684
                                        NULL, retry));
2685
      char err[ERR_MSG_LEN] = {0};
×
2686
      code = rawBlockBindRawData(pVgroupHash, pStmt->pVgDataBlocks, pTableMeta, rawData);
×
2687
      if (code != TSDB_CODE_SUCCESS) {
×
2688
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
2689
        goto end;
×
2690
      }
2691
    }
2692
    taosHashCleanup(pVgroupHash);
×
2693
    pVgroupHash = NULL;
×
2694

2695
    RAW_RETURN_CHECK(smlBuildOutputRaw(pQuery, pVgHash));
×
2696
    launchQueryImpl(pRequest, pQuery, true, NULL);
×
2697
    code = pRequest->code;
×
2698

2699
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
×
2700
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
2701
      qDestroyQuery(pQuery);
×
2702
      pQuery = NULL;
×
2703
      rspObj.resIter = -1;
×
2704
      continue;
×
2705
    }
2706
    break;
×
2707
  }
2708
  uDebug(LOG_ID_TAG " write raw rawdata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
×
2709

2710
end:
×
2711
  tDeleteMqDataRsp(&rspObj.dataRsp);
×
2712
  tDecoderClear(&decoder);
×
2713
  qDestroyQuery(pQuery);
×
2714
  taosHashCleanup(pVgroupHash);
×
2715
  destroyRequest(pRequest);
×
2716
  RAW_LOG_END
×
2717
  return code;
×
2718
}
2719

2720
static int32_t processSimpleMeta(SMqMetaRsp* pMetaRsp, cJSON** meta) {
29,513✔
2721
  if (pMetaRsp == NULL || meta == NULL) {
29,513✔
2722
    uError("invalid parameter in %s", __func__);
×
2723
    return TSDB_CODE_INVALID_PARA;
×
2724
  }
2725
  int32_t code = 0;
29,513✔
2726
  int32_t lino = 0;
29,513✔
2727
  RAW_LOG_START
29,513✔
2728
  if (pMetaRsp->resMsgType == TDMT_VND_CREATE_STB) {
29,513✔
2729
    RAW_RETURN_CHECK(processCreateStb(pMetaRsp, meta));
6,039✔
2730
  } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_STB) {
23,474✔
2731
    RAW_RETURN_CHECK(processAlterStb(pMetaRsp, meta));
1,902✔
2732
  } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_STB) {
21,572✔
2733
    RAW_RETURN_CHECK(processDropSTable(pMetaRsp, meta));
×
2734
  } else if (pMetaRsp->resMsgType == TDMT_VND_CREATE_TABLE) {
21,572✔
2735
    RAW_RETURN_CHECK(processCreateTable(pMetaRsp, meta));
15,012✔
2736
  } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_TABLE) {
6,560✔
2737
    RAW_RETURN_CHECK(processAlterTable(pMetaRsp, meta));
6,560✔
2738
  } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_TABLE) {
×
2739
    RAW_RETURN_CHECK(processDropTable(pMetaRsp, meta));
×
2740
  } else if (pMetaRsp->resMsgType == TDMT_VND_DELETE) {
×
2741
    RAW_RETURN_CHECK(processDeleteTable(pMetaRsp, meta));
×
2742
  }
2743

2744
end:
×
2745
  RAW_LOG_END
29,513✔
2746
  return code;
29,513✔
2747
}
2748

2749
static int32_t processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp, char** string) {
2,356✔
2750
  if (pMsgRsp == NULL || string == NULL) {
2,356✔
2751
    uError("invalid parameter in %s", __func__);
×
2752
    return TSDB_CODE_INVALID_PARA;
×
2753
  }
2754
  SDecoder        coder = {0};
2,356✔
2755
  SMqBatchMetaRsp rsp = {0};
2,356✔
2756
  int32_t         code = 0;
2,356✔
2757
  int32_t         lino = 0;
2,356✔
2758
  cJSON*          pJson = NULL;
2,356✔
2759
  tDecoderInit(&coder, pMsgRsp->pMetaBuff, pMsgRsp->metaBuffLen);
2,356✔
2760
  RAW_RETURN_CHECK(tDecodeMqBatchMetaRsp(&coder, &rsp));
2,356✔
2761

2762
  pJson = cJSON_CreateObject();
2,356✔
2763
  RAW_NULL_CHECK(pJson);
2,356✔
2764
  RAW_FALSE_CHECK(cJSON_AddStringToObject(pJson, "tmq_meta_version", TMQ_META_VERSION));
2,356✔
2765
  cJSON* pMetaArr = cJSON_AddArrayToObject(pJson, "metas");
2,356✔
2766
  RAW_NULL_CHECK(pMetaArr);
2,356✔
2767

2768
  int32_t num = taosArrayGetSize(rsp.batchMetaReq);
2,356✔
2769
  for (int32_t i = 0; i < num; i++) {
10,795✔
2770
    int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
8,439✔
2771
    RAW_NULL_CHECK(len);
8,439✔
2772
    void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
8,439✔
2773
    RAW_NULL_CHECK(tmpBuf);
8,439✔
2774
    SDecoder   metaCoder = {0};
8,439✔
2775
    SMqMetaRsp metaRsp = {0};
8,439✔
2776
    tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead));
8,439✔
2777
    RAW_RETURN_CHECK(tDecodeMqMetaRsp(&metaCoder, &metaRsp));
8,439✔
2778
    cJSON* pItem = NULL;
8,439✔
2779
    RAW_RETURN_CHECK(processSimpleMeta(&metaRsp, &pItem));
8,439✔
2780
    tDeleteMqMetaRsp(&metaRsp);
8,439✔
2781
    if (pItem != NULL) RAW_FALSE_CHECK(cJSON_AddItemToArray(pMetaArr, pItem));
8,439✔
2782
  }
2783

2784
  char* fullStr = cJSON_PrintUnformatted(pJson);
2,356✔
2785
  *string = fullStr;
2,356✔
2786

2787
end:
2,356✔
2788
  cJSON_Delete(pJson);
2,356✔
2789
  tDeleteMqBatchMetaRsp(&rsp);
2,356✔
2790
  RAW_LOG_END
2,356✔
2791
  return code;
2,356✔
2792
}
2793

2794
char* tmq_get_json_meta(TAOS_RES* res) {
24,479✔
2795
  int32_t code = TSDB_CODE_SUCCESS;
24,479✔
2796
  int32_t lino = 0;
24,479✔
2797
  char*   string = NULL;
24,479✔
2798
  RAW_LOG_START
24,479✔
2799
  RAW_NULL_CHECK(res);
24,479✔
2800
  RAW_FALSE_CHECK(TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res));
24,479✔
2801

2802
  SMqRspObj* rspObj = (SMqRspObj*)res;
24,479✔
2803
  if (TD_RES_TMQ_METADATA(res)) {
24,479✔
2804
    RAW_RETURN_CHECK(processAutoCreateTable(&rspObj->dataRsp, &string));
1,049✔
2805
  } else if (TD_RES_TMQ_BATCH_META(res)) {
23,430✔
2806
    RAW_RETURN_CHECK(processBatchMetaToJson(&rspObj->batchMetaRsp, &string));
2,356✔
2807
  } else if (TD_RES_TMQ_META(res)) {
21,074✔
2808
    cJSON* pJson = NULL;
21,074✔
2809
    RAW_RETURN_CHECK(processSimpleMeta(&rspObj->metaRsp, &pJson));
21,074✔
2810
    string = cJSON_PrintUnformatted(pJson);
21,074✔
2811
    cJSON_Delete(pJson);
21,074✔
2812
  } else {
2813
    uError("tmq_get_json_meta res:%d, invalid type", *(int8_t*)res);
×
2814
  }
2815

2816
  uDebug("tmq_get_json_meta string:%s", string);
24,479✔
2817

2818
end:
24,479✔
2819
  RAW_LOG_END
24,479✔
2820
  return string;
24,479✔
2821
}
2822

2823
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
24,479✔
2824

2825
static int32_t getOffSetLen(const SMqDataRsp* pRsp) {
6,187✔
2826
  if (pRsp == NULL) {
6,187✔
2827
    uError("invalid parameter in %s", __func__);
×
2828
    return TSDB_CODE_INVALID_PARA;
×
2829
  }
2830
  int32_t pos = 0;
6,187✔
2831
  int32_t code = 0;
6,187✔
2832
  int32_t lino = 0;
6,187✔
2833
  RAW_LOG_START
6,187✔
2834
  SEncoder coder = {0};
6,187✔
2835
  tEncoderInit(&coder, NULL, 0);
6,187✔
2836
  RAW_RETURN_CHECK(tEncodeSTqOffsetVal(&coder, &pRsp->reqOffset));
6,187✔
2837
  RAW_RETURN_CHECK(tEncodeSTqOffsetVal(&coder, &pRsp->rspOffset));
6,187✔
2838
  pos = coder.pos;
6,187✔
2839
  tEncoderClear(&coder);
6,187✔
2840

2841
end:
6,187✔
2842
  if (code != 0) {
6,187✔
2843
    uError("getOffSetLen failed, code:%d", code);
×
2844
    return code;
×
2845
  } else {
2846
    uDebug("getOffSetLen success, len:%d", pos);
6,187✔
2847
    return pos;
6,187✔
2848
  }
2849
}
2850

2851
typedef int32_t __encode_func__(SEncoder* pEncoder, const SMqDataRsp* pRsp);
2852
static int32_t  encodeMqDataRsp(__encode_func__* encodeFunc, SMqDataRsp* rspObj, tmq_raw_data* raw) {
6,187✔
2853
  if (raw == NULL || encodeFunc == NULL || rspObj == NULL) {
6,187✔
2854
    uError("invalid parameter in %s", __func__);
×
2855
    return TSDB_CODE_INVALID_PARA;
×
2856
  }
2857
  uint32_t len = 0;
6,187✔
2858
  int32_t  code = 0;
6,187✔
2859
  int32_t  lino = 0;
6,187✔
2860
  SEncoder encoder = {0};
6,187✔
2861
  void*    buf = NULL;
6,187✔
2862
  tEncodeSize(encodeFunc, rspObj, len, code);
6,187✔
2863
  RAW_FALSE_CHECK(code >= 0);
6,187✔
2864
  len += sizeof(int8_t) + sizeof(int32_t);
6,187✔
2865
  buf = taosMemoryCalloc(1, len);
6,187✔
2866
  RAW_NULL_CHECK(buf);
6,187✔
2867
  tEncoderInit(&encoder, buf, len);
6,187✔
2868
  RAW_RETURN_CHECK(tEncodeI8(&encoder, MQ_DATA_RSP_VERSION));
6,187✔
2869
  int32_t offsetLen = getOffSetLen(rspObj);
6,187✔
2870
  RAW_FALSE_CHECK(offsetLen > 0);
6,187✔
2871
  RAW_RETURN_CHECK(tEncodeI32(&encoder, offsetLen));
6,187✔
2872
  RAW_RETURN_CHECK(encodeFunc(&encoder, rspObj));
6,187✔
2873

2874
  raw->raw = buf;
6,187✔
2875
  buf = NULL;
6,187✔
2876
  raw->raw_len = len;
6,187✔
2877

2878
end:
6,187✔
2879
  RAW_LOG_END
6,187✔
2880
  return code;
6,187✔
2881
}
2882

2883
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
24,019✔
2884
  if (raw == NULL || res == NULL) {
24,019✔
2885
    uError("invalid parameter in %s", __func__);
×
2886
    return TSDB_CODE_INVALID_PARA;
×
2887
  }
2888
  int32_t code = TSDB_CODE_SUCCESS;
24,019✔
2889
  int32_t lino = 0;
24,019✔
2890
  RAW_LOG_START
24,019✔
2891
  *raw = (tmq_raw_data){0};
24,019✔
2892
  SMqRspObj* rspObj = ((SMqRspObj*)res);
24,019✔
2893
  if (TD_RES_TMQ_META(res)) {
24,019✔
2894
    raw->raw = rspObj->metaRsp.metaRsp;
16,427✔
2895
    raw->raw_len = rspObj->metaRsp.metaRspLen >= 0 ? rspObj->metaRsp.metaRspLen : 0;
16,427✔
2896
    raw->raw_type = rspObj->metaRsp.resMsgType;
16,427✔
2897
    uDebug("tmq get raw type meta:%p", raw);
16,427✔
2898
  } else if (TD_RES_TMQ(res)) {
7,592✔
2899
    RAW_RETURN_CHECK(encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->dataRsp, raw));
5,820✔
2900
    raw->raw_type = RES_TYPE__TMQ;
5,820✔
2901
    uDebug("tmq get raw type data:%p", raw);
5,820✔
2902
  } else if (TD_RES_TMQ_METADATA(res)) {
1,772✔
2903
    RAW_RETURN_CHECK(encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->dataRsp, raw));
367✔
2904
    raw->raw_type = RES_TYPE__TMQ_METADATA;
367✔
2905
    uDebug("tmq get raw type metadata:%p", raw);
367✔
2906
  } else if (TD_RES_TMQ_BATCH_META(res)) {
1,405✔
2907
    raw->raw = rspObj->batchMetaRsp.pMetaBuff;
1,405✔
2908
    raw->raw_len = rspObj->batchMetaRsp.metaBuffLen;
1,405✔
2909
    raw->raw_type = rspObj->resType;
1,405✔
2910
    uDebug("tmq get raw batch meta:%p", raw);
1,405✔
2911
  } else if (TD_RES_TMQ_RAW(res)) {
×
2912
    raw->raw = rspObj->dataRsp.rawData;
×
2913
    rspObj->dataRsp.rawData = NULL;
×
2914
    raw->raw_len = rspObj->dataRsp.len;
×
2915
    raw->raw_type = rspObj->resType;
×
2916
    uDebug("tmq get raw raw:%p", raw);
×
2917
  } else {
2918
    uError("tmq get raw error type:%d", *(int8_t*)res);
×
2919
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
2920
  }
2921

2922
end:
24,019✔
2923
  RAW_LOG_END
24,019✔
2924
  return code;
24,019✔
2925
}
2926

2927
void tmq_free_raw(tmq_raw_data raw) {
23,983✔
2928
  uDebug("tmq free raw data type:%d", raw.raw_type);
23,983✔
2929
  if (raw.raw_type == RES_TYPE__TMQ || raw.raw_type == RES_TYPE__TMQ_METADATA) {
23,983✔
2930
    taosMemoryFree(raw.raw);
6,151✔
2931
  } else if (raw.raw_type == RES_TYPE__TMQ_RAWDATA && raw.raw != NULL) {
17,832✔
2932
    taosMemoryFree(POINTER_SHIFT(raw.raw, -sizeof(SMqRspHead)));
×
2933
  }
2934
  (void)memset(terrMsg, 0, ERR_MSG_LEN);
23,983✔
2935
}
23,983✔
2936

2937
static int32_t writeRawInit() {
31,471✔
2938
  while (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_START) {
34,356✔
2939
    int8_t old = atomic_val_compare_exchange_8(&initFlag, 0, 1);
2,885✔
2940
    if (old == 0) {
2,885✔
2941
      int32_t code = initRawCacheHash();
2,885✔
2942
      if (code != 0) {
2,885✔
2943
        uError("tmq writeRawImpl init error:%d", code);
×
2944
        atomic_store_8(&initedFlag, WRITE_RAW_INIT_FAIL);
×
2945
        return code;
×
2946
      }
2947
      atomic_store_8(&initedFlag, WRITE_RAW_INIT_OK);
2,885✔
2948
    }
2949
  }
2950

2951
  if (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_FAIL) {
31,471✔
2952
    return TSDB_CODE_INTERNAL_ERROR;
×
2953
  }
2954
  return 0;
31,471✔
2955
}
2956

2957
static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) {
31,471✔
2958
  if (taos == NULL || buf == NULL) {
31,471✔
2959
    uError("invalid parameter in %s", __func__);
×
2960
    return TSDB_CODE_INVALID_PARA;
×
2961
  }
2962
  if (writeRawInit() != 0) {
31,471✔
2963
    return TSDB_CODE_INTERNAL_ERROR;
×
2964
  }
2965

2966
  if (type == TDMT_VND_CREATE_STB) {
31,471✔
2967
    return taosCreateStb(taos, buf, len);
4,675✔
2968
  } else if (type == TDMT_VND_ALTER_STB) {
26,796✔
2969
    return taosCreateStb(taos, buf, len);
×
2970
  } else if (type == TDMT_VND_DROP_STB) {
26,796✔
2971
    return taosDropStb(taos, buf, len);
×
2972
  } else if (type == TDMT_VND_CREATE_TABLE) {
26,796✔
2973
    return taosCreateTable(taos, buf, len);
13,012✔
2974
  } else if (type == TDMT_VND_ALTER_TABLE) {
13,784✔
2975
    return taosAlterTable(taos, buf, len);
6,228✔
2976
  } else if (type == TDMT_VND_DROP_TABLE) {
7,556✔
2977
    return taosDropTable(taos, buf, len);
×
2978
  } else if (type == TDMT_VND_DELETE) {
7,556✔
2979
    return taosDeleteData(taos, buf, len);
×
2980
  } else if (type == RES_TYPE__TMQ_METADATA) {
7,556✔
2981
    return tmqWriteRawMetaDataImpl(taos, buf, len);
367✔
2982
  } else if (type == RES_TYPE__TMQ_RAWDATA) {
7,189✔
2983
    return tmqWriteRawRawDataImpl(taos, buf, len);
×
2984
  } else if (type == RES_TYPE__TMQ) {
7,189✔
2985
    return tmqWriteRawDataImpl(taos, buf, len);
5,784✔
2986
  } else if (type == RES_TYPE__TMQ_BATCH_META) {
1,405✔
2987
    return tmqWriteBatchMetaDataImpl(taos, buf, len);
1,405✔
2988
  }
2989
  return TSDB_CODE_INVALID_PARA;
×
2990
}
2991

2992
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
23,983✔
2993
  if (taos == NULL || raw.raw == NULL || raw.raw_len <= 0) {
23,983✔
2994
    SET_ERROR_MSG("taos:%p or data:%p is NULL or raw_len <= 0", taos, raw.raw);
×
2995
    return TSDB_CODE_INVALID_PARA;
×
2996
  }
2997
  taosClearErrMsg();  // clear global error message
23,983✔
2998

2999
  return writeRawImpl(taos, raw.raw, raw.raw_len, raw.raw_type);
23,983✔
3000
}
3001

3002
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen) {
1,405✔
3003
  if (taos == NULL || meta == NULL) {
1,405✔
3004
    uError("invalid parameter in %s", __func__);
×
3005
    return TSDB_CODE_INVALID_PARA;
×
3006
  }
3007
  SMqBatchMetaRsp rsp = {0};
1,405✔
3008
  SDecoder        coder = {0};
1,405✔
3009
  int32_t         code = TSDB_CODE_SUCCESS;
1,405✔
3010
  int32_t         lino = 0;
1,405✔
3011

3012
  RAW_LOG_START
1,405✔
3013
  // decode and process req
3014
  tDecoderInit(&coder, meta, metaLen);
1,405✔
3015
  RAW_RETURN_CHECK(tDecodeMqBatchMetaRsp(&coder, &rsp));
1,405✔
3016
  int32_t num = taosArrayGetSize(rsp.batchMetaReq);
1,405✔
3017
  for (int32_t i = 0; i < num; i++) {
8,893✔
3018
    int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
7,488✔
3019
    RAW_NULL_CHECK(len);
7,488✔
3020
    void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
7,488✔
3021
    RAW_NULL_CHECK(tmpBuf);
7,488✔
3022
    SDecoder   metaCoder = {0};
7,488✔
3023
    SMqMetaRsp metaRsp = {0};
7,488✔
3024
    tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead));
7,488✔
3025
    RAW_RETURN_CHECK(tDecodeMqMetaRsp(&metaCoder, &metaRsp));
7,488✔
3026
    code = writeRawImpl(taos, metaRsp.metaRsp, metaRsp.metaRspLen, metaRsp.resMsgType);
7,488✔
3027
    tDeleteMqMetaRsp(&metaRsp);
7,488✔
3028
    if (code != TSDB_CODE_SUCCESS) {
7,488✔
3029
      goto end;
×
3030
    }
3031
  }
3032

3033
end:
1,405✔
3034
  tDeleteMqBatchMetaRsp(&rsp);
1,405✔
3035
  RAW_LOG_END
1,405✔
3036
  return code;
1,405✔
3037
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc