• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4998

21 Mar 2026 01:22PM UTC coverage: 72.335% (+0.6%) from 71.739%
#4998

push

travis-ci

web-flow
enh:register add secondEp (#34867)

61 of 69 new or added lines in 1 file covered. (88.41%)

8670 existing lines in 142 files now uncovered.

253516 of 350475 relevant lines covered (72.33%)

133451446.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.64
/source/client/src/clientRawBlockWrite.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <string.h>
17
#include "cJSON.h"
18
#include "clientInt.h"
19
#include "nodes.h"
20
#include "osMemPool.h"
21
#include "osMemory.h"
22
#include "parser.h"
23
#include "taosdef.h"
24
#include "tarray.h"
25
#include "tbase64.h"
26
#include "tcol.h"
27
#include "tcompression.h"
28
#include "tdatablock.h"
29
#include "tdataformat.h"
30
#include "tdef.h"
31
#include "tglobal.h"
32
#include "tmsgtype.h"
33

34
#define RAW_LOG_END                                                           \
35
  if (code != 0) {                                                            \
36
    uError("%s failed at line:%d since:%s", __func__, lino, tstrerror(code)); \
37
  } else {                                                                    \
38
    uDebug("%s return success", __func__);                                    \
39
  }
40

41
#define RAW_LOG_START uDebug("%s start", __func__);
42

43
#define RAW_NULL_CHECK(c) \
44
  do {                    \
45
    if (c == NULL) {      \
46
      lino = __LINE__;    \
47
      code = terrno;      \
48
      goto end;           \
49
    }                     \
50
  } while (0)
51

52
#define RAW_FALSE_CHECK(c)           \
53
  do {                               \
54
    if (!(c)) {                      \
55
      code = TSDB_CODE_INVALID_PARA; \
56
      lino = __LINE__;               \
57
      goto end;                      \
58
    }                                \
59
  } while (0)
60

61
#define RAW_RETURN_CHECK(c) \
62
  do {                      \
63
    code = c;               \
64
    if (code != 0) {        \
65
      lino = __LINE__;      \
66
      goto end;             \
67
    }                       \
68
  } while (0)
69

70
#define LOG_ID_TAG   "connId:0x%" PRIx64 ", QID:0x%" PRIx64
71
#define LOG_ID_VALUE *(int64_t*)taos, pRequest->requestId
72

73
#define TMQ_META_VERSION "1.0"
74

75
static cJSON* tmqAddObjectToArray(cJSON* array) {
139,000✔
76
  cJSON* item = cJSON_CreateObject();
139,000✔
77
  if (cJSON_AddItemToArray(array, item)) {
139,000✔
78
    return item;
139,000✔
79
  }
UNCOV
80
  cJSON_Delete(item);
×
UNCOV
81
  return NULL;
×
82
}
83

84
static cJSON* tmqAddStringToArray(cJSON* array, const char* str) {
×
85
  cJSON* item = cJSON_CreateString(str);
×
UNCOV
86
  if (cJSON_AddItemToArray(array, item)) {
×
87
    return item;
×
88
  }
UNCOV
89
  cJSON_Delete(item);
×
UNCOV
90
  return NULL;
×
91
}
92
 
93
#define ADD_TO_JSON_STRING(JSON,NAME,VALUE) \
94
  RAW_NULL_CHECK(cJSON_AddStringToObject(JSON, NAME, VALUE));
95

96
#define ADD_TO_JSON_BOOL(JSON,NAME,VALUE) \
97
  RAW_NULL_CHECK(cJSON_AddBoolToObject(JSON, NAME, VALUE));
98

99
#define ADD_TO_JSON_NUMBER(JSON,NAME,VALUE) \
100
  RAW_NULL_CHECK(cJSON_AddNumberToObject(JSON, NAME, VALUE));
101

102
static int32_t  tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen);
103
static tb_uid_t processSuid(tb_uid_t suid, char* db) {
5,143✔
104
  if (db == NULL) {
5,143✔
UNCOV
105
    return suid;
×
106
  }
107
  return suid + MurmurHash3_32(db, strlen(db));
5,143✔
108
}
109

110
static int32_t getLength(int8_t type, int32_t bytes, int32_t typeMod) {
105,464✔
111
  int32_t length = 0;
105,464✔
112
  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_VARBINARY || type == TSDB_DATA_TYPE_GEOMETRY) {
105,464✔
113
    length = bytes - VARSTR_HEADER_SIZE;
15,379✔
114
  } else if (type == TSDB_DATA_TYPE_NCHAR) {
90,085✔
115
    length = (bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
8,599✔
116
  } else if (IS_STR_DATA_BLOB(type)) {
81,486✔
UNCOV
117
    length = bytes - BLOBSTR_HEADER_SIZE;
×
118
  } else if (type == TSDB_DATA_TYPE_DECIMAL || type == TSDB_DATA_TYPE_DECIMAL64) {
81,486✔
119
    length = typeMod;
366✔
120
  }
121
  return length;
105,464✔
122
}
123

124
static int32_t buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, SExtSchema* pExtSchemas, char* name, int64_t id, int8_t t,
11,891✔
125
                                 bool isVirtual, SColRefWrapper* colRef, SColCmprWrapper* pColCmprRow, cJSON** pJson) {
126
  if (schemaRow == NULL || name == NULL || pColCmprRow == NULL || pJson == NULL) {
11,891✔
UNCOV
127
    uError("invalid parameter, schemaRow:%p, name:%p, pColCmprRow:%p, pJson:%p", schemaRow, name, pColCmprRow, pJson);
×
UNCOV
128
    return TSDB_CODE_INVALID_PARA;
×
129
  }
130
  int32_t code = TSDB_CODE_SUCCESS;
11,891✔
131
  int32_t lino = 0;
11,891✔
132
  int8_t  buildDefaultCompress = 0;
11,891✔
133
  if (pColCmprRow->nCols <= 0) {
11,891✔
134
    buildDefaultCompress = 1;
2,995✔
135
  }
136
  RAW_LOG_START
11,891✔
137
  char*  string = NULL;
11,891✔
138
  cJSON* json = cJSON_CreateObject();
11,891✔
139
  RAW_NULL_CHECK(json);
11,891✔
140

141
  ADD_TO_JSON_STRING(json, "type", "create");
11,891✔
142
  ADD_TO_JSON_STRING(json, "tableType", (t == TSDB_SUPER_TABLE ? "super" : "normal"));
11,891✔
143
  if (isVirtual){
11,891✔
144
    ADD_TO_JSON_BOOL(json, "isVirtual", isVirtual);
4,585✔
145
  }
146
  ADD_TO_JSON_STRING(json, "tableName", name);
11,891✔
147

148
  cJSON* columns = cJSON_AddArrayToObject(json, "columns");
11,891✔
149
  RAW_NULL_CHECK(columns);
11,891✔
150

151
  for (int i = 0; i < schemaRow->nCols; i++) {
105,699✔
152
    cJSON* column = tmqAddObjectToArray(columns);
93,808✔
153
    RAW_NULL_CHECK(column);
93,808✔
154
    SSchema* s = schemaRow->pSchema + i;
93,808✔
155
    ADD_TO_JSON_STRING(column, "name", s->name);
93,808✔
156
    ADD_TO_JSON_NUMBER(column, "type", s->type);
93,808✔
157
    int32_t typeMod = 0;
93,808✔
158
    if (pExtSchemas != NULL) {
93,808✔
159
      typeMod = pExtSchemas[i].typeMod;
58,012✔
160
    }
161
    int32_t length = getLength(s->type, s->bytes, typeMod);
93,808✔
162
    if (length > 0) {
93,808✔
163
      ADD_TO_JSON_NUMBER(column, "length", length);
14,504✔
164
    }
165

166
    if (isVirtual && colRef != NULL && i < colRef->nCols){
93,808✔
167
      SColRef* pColRef = colRef->pColRef + i;
8,850✔
168
      if (pColRef->hasRef) {
8,850✔
169
        cJSON* ref = cJSON_AddObjectToObject(column, "ref");
5,586✔
170
        RAW_NULL_CHECK(ref);
5,586✔
171
        ADD_TO_JSON_STRING(ref, "refDbName", pColRef->refDbName);
5,586✔
172
        ADD_TO_JSON_STRING(ref, "refTableName", pColRef->refTableName);
5,586✔
173
        ADD_TO_JSON_STRING(ref, "refColName", pColRef->refColName);
5,586✔
174
      }
175
    }
176
    ADD_TO_JSON_BOOL(column, "isPrimarykey", (s->flags & COL_IS_KEY));
93,808✔
177
    if (pColCmprRow == NULL) {
93,808✔
UNCOV
178
      continue;
×
179
    }
180

181
    uint32_t alg = 0;
93,808✔
182
    if (buildDefaultCompress) {
93,808✔
183
      alg = createDefaultColCmprByType(s->type);
9,568✔
184
    } else {
185
      SColCmpr* pColCmpr = pColCmprRow->pColCmpr + i;
84,240✔
186
      alg = pColCmpr->alg;
84,240✔
187
    }
188
    const char* encode = columnEncodeStr(COMPRESS_L1_TYPE_U32(alg));
93,808✔
189
    RAW_NULL_CHECK(encode);
93,808✔
190
    const char* compress = columnCompressStr(COMPRESS_L2_TYPE_U32(alg));
93,808✔
191
    RAW_NULL_CHECK(compress);
93,808✔
192
    const char* level = columnLevelStr(COMPRESS_L2_TYPE_LEVEL_U32(alg));
93,808✔
193
    RAW_NULL_CHECK(level);
93,808✔
194

195
    ADD_TO_JSON_STRING(column, "encode", encode);
93,808✔
196
    ADD_TO_JSON_STRING(column, "compress", compress);
93,808✔
197
    ADD_TO_JSON_STRING(column, "level", level);
93,808✔
198
  }
199

200
  cJSON* tags = cJSON_AddArrayToObject(json, "tags");
11,891✔
201
  RAW_NULL_CHECK(tags);
11,891✔
202

203
  for (int i = 0; schemaTag && i < schemaTag->nCols; i++) {
20,253✔
204
    cJSON* tag = tmqAddObjectToArray(tags);
8,362✔
205
    RAW_NULL_CHECK(tag);
8,362✔
206
    SSchema* s = schemaTag->pSchema + i;
8,362✔
207
    ADD_TO_JSON_STRING(tag, "name", s->name);
8,362✔
208
    ADD_TO_JSON_NUMBER(tag, "type", s->type);
8,362✔
209
    int32_t length = getLength(s->type, s->bytes, 0);
8,362✔
210
    if (length > 0) {
8,362✔
211
      ADD_TO_JSON_NUMBER(tag, "length", length);
6,546✔
212
    }
213
  }
214

215
end:
11,891✔
216
  *pJson = json;
11,891✔
217
  RAW_LOG_END
11,891✔
218
  return code;
11,891✔
219
}
220

221
static int32_t setCompressOption(cJSON* json, uint32_t para) {
×
UNCOV
222
  if (json == NULL) {
×
223
    return TSDB_CODE_INVALID_PARA;
×
224
  }
225
  uint8_t encode = COMPRESS_L1_TYPE_U32(para);
×
226
  int32_t code = 0;
×
227
  int32_t lino = 0;
×
228
  RAW_LOG_START
×
229
  if (encode != 0) {
×
230
    const char* encodeStr = columnEncodeStr(encode);
×
231
    RAW_NULL_CHECK(encodeStr);
×
UNCOV
232
    ADD_TO_JSON_STRING(json, "encode", encodeStr);
×
233
    goto end;
×
234
  }
235
  uint8_t compress = COMPRESS_L2_TYPE_U32(para);
×
236
  if (compress != 0) {
×
237
    const char* compressStr = columnCompressStr(compress);
×
238
    RAW_NULL_CHECK(compressStr);
×
UNCOV
239
    ADD_TO_JSON_STRING(json, "compress", compressStr);
×
240
    goto end;
×
241
  }
242
  uint8_t level = COMPRESS_L2_TYPE_LEVEL_U32(para);
×
243
  if (level != 0) {
×
244
    const char* levelStr = columnLevelStr(level);
×
245
    RAW_NULL_CHECK(levelStr);
×
UNCOV
246
    ADD_TO_JSON_STRING(json, "level", levelStr);
×
UNCOV
247
    goto end;
×
248
  }
249

250
end:
×
UNCOV
251
  RAW_LOG_END
×
UNCOV
252
  return code;
×
253
}
254
static int32_t buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON** pJson) {
1,914✔
255
  if (alterData == NULL || pJson == NULL) {
1,914✔
UNCOV
256
    uError("invalid parameter in %s alterData:%p", __func__, alterData);
×
UNCOV
257
    return TSDB_CODE_INVALID_PARA;
×
258
  }
259
  SMAlterStbReq req = {0};
1,914✔
260
  cJSON*        json = NULL;
1,914✔
261
  char*         string = NULL;
1,914✔
262
  int32_t       code = 0;
1,914✔
263
  int32_t       lino = 0;
1,914✔
264
  RAW_LOG_START
1,914✔
265
  RAW_RETURN_CHECK(tDeserializeSMAlterStbReq(alterData, alterDataLen, &req));
1,914✔
266
  json = cJSON_CreateObject();
1,914✔
267
  RAW_NULL_CHECK(json);
1,914✔
268
  ADD_TO_JSON_STRING(json, "type", "alter");
1,914✔
269
  SName name = {0};
1,914✔
270
  RAW_RETURN_CHECK(tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
1,914✔
271
  ADD_TO_JSON_STRING(json, "tableType", "super");
1,914✔
272
  ADD_TO_JSON_STRING(json, "tableName", name.tname);
1,914✔
273
  ADD_TO_JSON_NUMBER(json, "alterType", req.alterType);
1,914✔
274

275
  switch (req.alterType) {
1,914✔
276
    case TSDB_ALTER_TABLE_ADD_TAG:
957✔
277
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
278
      if (taosArrayGetSize(req.pFields) != 1) {
957✔
279
        uError("invalid field num %" PRIzu " for alter type %d", taosArrayGetSize(req.pFields), req.alterType);
×
280
        cJSON_Delete(json);
×
281
        json = NULL;
×
UNCOV
282
        code = TSDB_CODE_INVALID_PARA;
×
UNCOV
283
        goto end;
×
284
      }
285
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
957✔
286
      RAW_NULL_CHECK(field);
957✔
287
      ADD_TO_JSON_STRING(json, "colName", field->name);
957✔
288
      ADD_TO_JSON_NUMBER(json, "colType", field->type);
957✔
289
      int32_t typeMode = 0;
957✔
290
      if (taosArrayGetSize(req.pTypeMods) > 0) {
957✔
UNCOV
291
        typeMode = *(STypeMod*)taosArrayGet(req.pTypeMods, 0);
×
292
      }
293
      int32_t length = getLength(field->type, field->bytes, typeMode);
957✔
294
      if (length > 0) {
957✔
295
        ADD_TO_JSON_NUMBER(json, "colLength", length);
957✔
296
      }
297

298
      break;
957✔
299
    }
300
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: {
×
301
      SFieldWithOptions* field = taosArrayGet(req.pFields, 0);
×
302
      RAW_NULL_CHECK(field);
×
303
      ADD_TO_JSON_STRING(json, "colName", field->name);
×
304
      ADD_TO_JSON_NUMBER(json, "colType", field->type);
×
305
      int32_t typeMode = 0;
×
UNCOV
306
      if (taosArrayGetSize(req.pTypeMods) > 0) {
×
307
        typeMode = *(STypeMod*)taosArrayGet(req.pTypeMods, 0);
×
308
      }
309
      int32_t length = getLength(field->type, field->bytes, typeMode);
×
UNCOV
310
      if (length > 0) {
×
UNCOV
311
        ADD_TO_JSON_NUMBER(json, "colLength", length);
×
312
      }
313

UNCOV
314
      RAW_RETURN_CHECK(setCompressOption(json, field->compress));
×
315
      break;
×
316
    }
317
    case TSDB_ALTER_TABLE_DROP_TAG:
×
318
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
319
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
×
320
      RAW_NULL_CHECK(field);
×
UNCOV
321
      ADD_TO_JSON_STRING(json, "colName", field->name);
×
UNCOV
322
      break;
×
323
    }
324
    case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
957✔
325
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
326
      if (taosArrayGetSize(req.pFields) != 1) {
957✔
327
        uError("invalid field num %" PRIzu " for alter type %d", taosArrayGetSize(req.pFields), req.alterType);
×
328
        cJSON_Delete(json);
×
329
        json = NULL;
×
UNCOV
330
        code = TSDB_CODE_INVALID_PARA;
×
UNCOV
331
        goto end;
×
332
      }
333
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
957✔
334
      RAW_NULL_CHECK(field);
957✔
335
      ADD_TO_JSON_STRING(json, "colName", field->name);
957✔
336
      ADD_TO_JSON_NUMBER(json, "colType", field->type);
957✔
337
      int32_t length = getLength(field->type, field->bytes, 0);
957✔
338
      if (length > 0) {
957✔
339
        ADD_TO_JSON_NUMBER(json, "colLength", length);
957✔
340
      }
341

342
      break;
957✔
343
    }
344
    case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
×
345
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
346
      TAOS_FIELD* oldField = taosArrayGet(req.pFields, 0);
×
347
      RAW_NULL_CHECK(oldField);
×
348
      TAOS_FIELD* newField = taosArrayGet(req.pFields, 1);
×
349
      RAW_NULL_CHECK(newField);
×
350
      ADD_TO_JSON_STRING(json, "colName", oldField->name);
×
UNCOV
351
      ADD_TO_JSON_STRING(json, "colNewName", newField->name);
×
352
      break;
×
353
    }
354
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
×
355
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
×
356
      RAW_NULL_CHECK(field);
×
357
      ADD_TO_JSON_STRING(json, "colName", field->name);
×
UNCOV
358
      RAW_RETURN_CHECK(setCompressOption(json, field->bytes));
×
359
      break;
×
360
    }
UNCOV
361
    default:
×
UNCOV
362
      break;
×
363
  }
364

365
end:
1,914✔
366
  tFreeSMAltertbReq(&req);
1,914✔
367
  *pJson = json;
1,914✔
368
  RAW_LOG_END
1,914✔
369
  return code;
1,914✔
370
}
371

372
static int32_t processCreateStb(SMqMetaRsp* metaRsp, cJSON** pJson) {
6,529✔
373
  if (metaRsp == NULL || pJson == NULL) {
6,529✔
UNCOV
374
    uError("invalid parameter in %s", __func__);
×
UNCOV
375
    return TSDB_CODE_INVALID_PARA;
×
376
  }
377
  int32_t        code = TSDB_CODE_SUCCESS;
6,529✔
378
  int32_t        lino = 0;
6,529✔
379
  SVCreateStbReq req = {0};
6,529✔
380
  SDecoder       coder = {0};
6,529✔
381

382
  RAW_LOG_START
6,529✔
383
  // decode and process req
384
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
6,529✔
385
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
6,529✔
386
  tDecoderInit(&coder, data, len);
6,529✔
387

388
  RAW_RETURN_CHECK(tDecodeSVCreateStbReq(&coder, &req));
6,529✔
389
  RAW_RETURN_CHECK(buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.pExtSchemas, req.name, req.suid,
6,529✔
390
                                        TSDB_SUPER_TABLE, req.virtualStb, NULL, &req.colCmpr, pJson));
391

392
end:
6,529✔
393
  tDecoderClear(&coder);
6,529✔
394
  RAW_LOG_END
6,529✔
395
  return code;
6,529✔
396
}
397

398
static int32_t processAlterStb(SMqMetaRsp* metaRsp, cJSON** pJson) {
1,914✔
399
  if (metaRsp == NULL || pJson == NULL) {
1,914✔
UNCOV
400
    uError("invalid parameter in %s", __func__);
×
UNCOV
401
    return TSDB_CODE_INVALID_PARA;
×
402
  }
403
  SVCreateStbReq req = {0};
1,914✔
404
  SDecoder       coder = {0};
1,914✔
405
  int32_t        code = TSDB_CODE_SUCCESS;
1,914✔
406
  int32_t        lino = 0;
1,914✔
407
  RAW_LOG_START
1,914✔
408

409
  // decode and process req
410
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
1,914✔
411
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
1,914✔
412
  tDecoderInit(&coder, data, len);
1,914✔
413

414
  RAW_RETURN_CHECK(tDecodeSVCreateStbReq(&coder, &req));
1,914✔
415
  RAW_RETURN_CHECK(buildAlterSTableJson(req.alterOriData, req.alterOriDataLen, pJson));
1,914✔
416

417
end:
1,914✔
418
  tDecoderClear(&coder);
1,914✔
419
  RAW_LOG_END
1,914✔
420
  return code;
1,914✔
421
}
422

423
static int32_t buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
14,106✔
424
  if (json == NULL || pCreateReq == NULL) {
14,106✔
UNCOV
425
    uError("invalid parameter in %s", __func__);
×
UNCOV
426
    return TSDB_CODE_INVALID_PARA;
×
427
  }
428
  STag*   pTag = (STag*)pCreateReq->ctb.pTag;
14,106✔
429
  char*   sname = pCreateReq->ctb.stbName;
14,106✔
430
  char*   name = pCreateReq->name;
14,106✔
431
  SArray* tagName = pCreateReq->ctb.tagName;
14,106✔
432
  int64_t id = pCreateReq->uid;
14,106✔
433
  uint8_t tagNum = pCreateReq->ctb.tagNum;
14,106✔
434
  int32_t code = 0;
14,106✔
435
  int32_t lino = 0;
14,106✔
436
  SArray* pTagVals = NULL;
14,106✔
437
  char*   pJson = NULL;
14,106✔
438
  char*   buf = NULL;
14,106✔
439
  RAW_LOG_START
14,106✔
440

441
  ADD_TO_JSON_STRING(json, "tableName", name);
14,106✔
442

443
  if (pCreateReq->type == TSDB_VIRTUAL_CHILD_TABLE) {
14,106✔
444
    cJSON* refs = cJSON_AddArrayToObject(json, "refs");
3,898✔
445
    RAW_NULL_CHECK(refs);
3,898✔
446

447
    for (int i = 0; i < pCreateReq->colRef.nCols; i++) {
19,490✔
448
      SColRef* pColRef = pCreateReq->colRef.pColRef + i;
15,592✔
449
  
450
      if (!pColRef->hasRef) {
15,592✔
451
        continue;
7,796✔
452
      }
453
      cJSON* ref = tmqAddObjectToArray(refs);
7,796✔
454
      RAW_NULL_CHECK(ref);
7,796✔
455
      ADD_TO_JSON_STRING(ref, "colName", pColRef->colName);
7,796✔
456
      ADD_TO_JSON_STRING(ref, "refDbName", pColRef->refDbName);
7,796✔
457
      ADD_TO_JSON_STRING(ref, "refTableName", pColRef->refTableName);
7,796✔
458
      ADD_TO_JSON_STRING(ref, "refColName", pColRef->refColName);
7,796✔
459
    }
460
  }
461

462
  ADD_TO_JSON_STRING(json, "using", sname);
14,106✔
463
  ADD_TO_JSON_NUMBER(json, "tagNum", tagNum);
14,106✔
464

465
  cJSON* tags = cJSON_AddArrayToObject(json, "tags");
14,106✔
466
  RAW_NULL_CHECK(tags);
14,106✔
467
  RAW_RETURN_CHECK(tTagToValArray(pTag, &pTagVals));
14,106✔
468
  if (tTagIsJson(pTag)) {
14,106✔
469
    STag* p = (STag*)pTag;
×
470
    if (p->nTag == 0) {
×
UNCOV
471
      uWarn("p->nTag == 0");
×
472
      goto end;
×
473
    }
474
    parseTagDatatoJson(pTag, &pJson, NULL);
×
475
    RAW_NULL_CHECK(pJson);
×
476
    cJSON* tag = tmqAddObjectToArray(tags);
×
477
    RAW_NULL_CHECK(tag);
×
478
    STagVal* pTagVal = taosArrayGet(pTagVals, 0);
×
479
    RAW_NULL_CHECK(pTagVal);
×
480
    char* ptname = taosArrayGet(tagName, 0);
×
481
    RAW_NULL_CHECK(ptname);
×
482
    ADD_TO_JSON_STRING(tag, "name", ptname);
×
483
    ADD_TO_JSON_NUMBER(tag, "type", TSDB_DATA_TYPE_JSON);
×
UNCOV
484
    ADD_TO_JSON_STRING(tag, "value", pJson);
×
UNCOV
485
    goto end;
×
486
  }
487

488
  for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
32,572✔
489
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
18,466✔
490
    RAW_NULL_CHECK(pTagVal);
18,466✔
491
    cJSON* tag = tmqAddObjectToArray(tags);
18,466✔
492
    RAW_NULL_CHECK(tag);
18,466✔
493
    char* ptname = taosArrayGet(tagName, i);
18,466✔
494
    RAW_NULL_CHECK(ptname);
18,466✔
495
    ADD_TO_JSON_STRING(tag, "name", ptname);
18,466✔
496
    ADD_TO_JSON_NUMBER(tag, "type", pTagVal->type);
18,466✔
497

498
    if (IS_VAR_DATA_TYPE(pTagVal->type)) {
31,462✔
499
      if (IS_STR_DATA_BLOB(pTagVal->type)) {
12,996✔
UNCOV
500
        goto end;
×
501
      }
502
      int64_t bufSize = 0;
12,996✔
503
      if (pTagVal->type == TSDB_DATA_TYPE_VARBINARY) {
12,996✔
UNCOV
504
        bufSize = pTagVal->nData * 2 + 2 + 3;
×
505
      } else {
506
        bufSize = pTagVal->nData + 3;
12,996✔
507
      }
508
      buf = taosMemoryCalloc(bufSize, 1);
12,996✔
509
      RAW_NULL_CHECK(buf);
12,996✔
510
      code = dataConverToStr(buf, bufSize, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL);
12,996✔
511
      if (code != TSDB_CODE_SUCCESS) {
12,996✔
UNCOV
512
        uError("convert tag value to string failed");
×
UNCOV
513
        goto end;
×
514
      }
515

516
      ADD_TO_JSON_STRING(tag, "value", buf)
12,996✔
517
      taosMemoryFreeClear(buf);
12,996✔
518
    } else {
519
      double val = 0;
5,470✔
520
      GET_TYPED_DATA(val, double, pTagVal->type, &pTagVal->i64, 0);  // currently tag type can't be decimal, so pass 0 as typeMod
5,470✔
521
      ADD_TO_JSON_NUMBER(tag, "value", val)
5,470✔
522
    }
523
  }
524

525
end:
14,106✔
526
  taosMemoryFree(pJson);
14,106✔
527
  taosArrayDestroy(pTagVals);
14,106✔
528
  taosMemoryFree(buf);
14,106✔
529
  RAW_LOG_END
14,106✔
530
  return code;
14,106✔
531
}
532

533
static int32_t buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs, cJSON** pJson) {
12,726✔
534
  if (pJson == NULL || pCreateReq == NULL) {
12,726✔
UNCOV
535
    uError("invalid parameter in %s", __func__);
×
UNCOV
536
    return TSDB_CODE_INVALID_PARA;
×
537
  }
538
  int32_t code = 0;
12,726✔
539
  int32_t lino = 0;
12,726✔
540
  RAW_LOG_START
12,726✔
541
  char*  string = NULL;
12,726✔
542
  cJSON* json = cJSON_CreateObject();
12,726✔
543
  RAW_NULL_CHECK(json);
12,726✔
544
  ADD_TO_JSON_STRING(json, "type", "create");
12,726✔
545
  ADD_TO_JSON_STRING(json, "tableType", "child");
12,726✔
546

547
  RAW_RETURN_CHECK(buildChildElement(json, pCreateReq));
12,726✔
548
  cJSON* createList = cJSON_AddArrayToObject(json, "createList");
12,726✔
549
  RAW_NULL_CHECK(createList);
12,726✔
550

551
  for (int i = 0; nReqs > 1 && i < nReqs; i++) {
14,106✔
552
    cJSON* create = tmqAddObjectToArray(createList);
1,380✔
553
    RAW_NULL_CHECK(create);
1,380✔
554
    RAW_RETURN_CHECK(buildChildElement(create, pCreateReq + i));
1,380✔
555
  }
556

557
end:
12,726✔
558
  *pJson = json;
12,726✔
559
  RAW_LOG_END
12,726✔
560
  return code;
12,726✔
561
}
562

563
static int32_t processCreateTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
16,338✔
564
  if (pJson == NULL || metaRsp == NULL) {
16,338✔
UNCOV
565
    uError("invalid parameter in %s", __func__);
×
UNCOV
566
    return TSDB_CODE_INVALID_PARA;
×
567
  }
568
  int32_t            code = TSDB_CODE_SUCCESS;
16,338✔
569
  int32_t            lino = 0;
16,338✔
570
  SDecoder           decoder = {0};
16,338✔
571
  SVCreateTbBatchReq req = {0};
16,338✔
572
  SVCreateTbReq*     pCreateReq;
573
  RAW_LOG_START
16,338✔
574
  // decode
575
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
16,338✔
576
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
16,338✔
577
  tDecoderInit(&decoder, data, len);
16,338✔
578
  RAW_RETURN_CHECK(tDecodeSVCreateTbBatchReq(&decoder, &req));
16,338✔
579
  // loop to create table
580
  if (req.nReqs > 0) {
16,338✔
581
    pCreateReq = req.pReqs;
16,338✔
582
    if (pCreateReq->type == TSDB_CHILD_TABLE || pCreateReq->type == TSDB_VIRTUAL_CHILD_TABLE) {
16,338✔
583
      RAW_RETURN_CHECK(buildCreateCTableJson(req.pReqs, req.nReqs, pJson));
11,335✔
584
    } else if (pCreateReq->type == TSDB_NORMAL_TABLE || pCreateReq->type == TSDB_VIRTUAL_NORMAL_TABLE) {
5,003✔
585
      RAW_RETURN_CHECK(buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->pExtSchemas, pCreateReq->name,
5,003✔
586
                                            pCreateReq->uid, pCreateReq->type, pCreateReq->type == TSDB_NORMAL_TABLE ? false : true, 
587
                                            &pCreateReq->colRef, &pCreateReq->colCmpr, pJson));
588
    }
589
  }
590

591
end:
16,338✔
592
  tDeleteSVCreateTbBatchReq(&req);
16,338✔
593
  tDecoderClear(&decoder);
16,338✔
594
  RAW_LOG_END
16,338✔
595
  return code;
16,338✔
596
}
597

598
static int32_t processAutoCreateTable(SMqDataRsp* rsp, char** string) {
1,750✔
599
  int32_t lino = 0;
1,750✔
600
  int32_t code = TSDB_CODE_SUCCESS;
1,750✔
601
  RAW_LOG_START
1,750✔
602
  RAW_FALSE_CHECK(rsp != NULL && string != NULL);
1,750✔
603
  SDecoder*      decoder = NULL;
1,750✔
604
  SVCreateTbReq* pCreateReq = NULL;
1,750✔
605
  RAW_FALSE_CHECK(rsp->createTableNum > 0);
1,750✔
606

607
  decoder = taosMemoryCalloc(rsp->createTableNum, sizeof(SDecoder));
1,750✔
608
  RAW_NULL_CHECK(decoder);
1,750✔
609
  pCreateReq = taosMemoryCalloc(rsp->createTableNum, sizeof(SVCreateTbReq));
1,750✔
610
  RAW_NULL_CHECK(pCreateReq);
1,750✔
611

612
  // loop to create table
613
  for (int32_t iReq = 0; iReq < rsp->createTableNum; iReq++) {
4,190✔
614
    // decode
615
    void** data = taosArrayGet(rsp->createTableReq, iReq);
2,440✔
616
    RAW_NULL_CHECK(data);
2,440✔
617
    int32_t* len = taosArrayGet(rsp->createTableLen, iReq);
2,440✔
618
    RAW_NULL_CHECK(len);
2,440✔
619
    tDecoderInit(&decoder[iReq], *data, *len);
2,440✔
620
    RAW_RETURN_CHECK(tDecodeSVCreateTbReq(&decoder[iReq], pCreateReq + iReq));
2,440✔
621

622
    if (pCreateReq[iReq].type != TSDB_CHILD_TABLE && pCreateReq[iReq].type != TSDB_NORMAL_TABLE) {
2,440✔
623
      uError("%s failed. pCreateReq[iReq].type:%d invalid", __func__, pCreateReq[iReq].type);
×
UNCOV
624
      code = TSDB_CODE_INVALID_PARA;
×
UNCOV
625
      goto end;
×
626
    }
627
  }
628
  cJSON* pJson = NULL;
1,750✔
629
  if (pCreateReq->type == TSDB_NORMAL_TABLE) {
1,750✔
630
    RAW_RETURN_CHECK(buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->pExtSchemas, pCreateReq->name,
359✔
631
                                          pCreateReq->uid, TSDB_NORMAL_TABLE, false, NULL, &pCreateReq->colCmpr, &pJson));
632
  } else if (pCreateReq->type == TSDB_CHILD_TABLE) {
1,391✔
633
    RAW_RETURN_CHECK(buildCreateCTableJson(pCreateReq, rsp->createTableNum, &pJson));
1,391✔
634
  }
635

636
  *string = cJSON_PrintUnformatted(pJson);
1,750✔
637
  cJSON_Delete(pJson);
1,750✔
638

639
  uDebug("auto created table return, sql json:%s", *string);
1,750✔
640

641
end:
1,750✔
642
  RAW_LOG_END
1,750✔
643
  for (int i = 0; decoder && pCreateReq && i < rsp->createTableNum; i++) {
4,190✔
644
    tDecoderClear(&decoder[i]);
2,440✔
645
    taosMemoryFreeClear(pCreateReq[i].comment);
2,440✔
646
    if (pCreateReq[i].type == TSDB_CHILD_TABLE) {
2,440✔
647
      taosArrayDestroy(pCreateReq[i].ctb.tagName);
2,081✔
648
    }
649
  }
650
  taosMemoryFree(decoder);
1,750✔
651
  taosMemoryFree(pCreateReq);
1,750✔
652
  return code;
1,750✔
653
}
654

655
static int32_t processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
8,445✔
656
  if (pJson == NULL || metaRsp == NULL) {
8,445✔
UNCOV
657
    uError("invalid parameter in %s", __func__);
×
UNCOV
658
    return TSDB_CODE_INVALID_PARA;
×
659
  }
660
  SDecoder     decoder = {0};
8,445✔
661
  SVAlterTbReq vAlterTbReq = {0};
8,445✔
662
  char*        string = NULL;
8,445✔
663
  cJSON*       json = NULL;
8,445✔
664
  int32_t      code = 0;
8,445✔
665
  int32_t      lino = 0;
8,445✔
666
  char*        buf = NULL;
8,445✔
667
  char*        buf1 = NULL;
8,445✔
668
  char*        buf2 = NULL;
8,445✔
669
  SNode*       pWhere = NULL;
8,445✔
670

671
  RAW_LOG_START
8,445✔
672

673
  // decode
674
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
8,445✔
675
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
8,445✔
676
  tDecoderInit(&decoder, data, len);
8,445✔
677
  RAW_RETURN_CHECK(tDecodeSVAlterTbReq(&decoder, &vAlterTbReq));
8,445✔
678

679
  json = cJSON_CreateObject();
8,445✔
680
  RAW_NULL_CHECK(json);
8,445✔
681
  ADD_TO_JSON_STRING(json, "type", "alter");
8,445✔
682

683
  char* tableType = NULL;
8,445✔
684
  if (vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TABLE_TAG_VAL){
8,445✔
685
    tableType = "child";
2,929✔
686
  } else if (vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_CHILD_TABLE_TAG_VAL){
5,516✔
687
    tableType = "super";
740✔
688
  } else if (vAlterTbReq.action == TSDB_ALTER_TABLE_ALTER_COLUMN_REF ||
4,776✔
689
             vAlterTbReq.action == TSDB_ALTER_TABLE_REMOVE_COLUMN_REF) {
3,078✔
690
    tableType = "";
3,396✔
691
  } else {
692
    tableType = "normal";
1,380✔
693
  }
694

695
  ADD_TO_JSON_STRING(json, "tableType", tableType);
8,445✔
696
  ADD_TO_JSON_STRING(json, "tableName", vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TABLE_TAG_VAL ? "" : vAlterTbReq.tbName);
8,445✔
697
  ADD_TO_JSON_NUMBER(json, "alterType", vAlterTbReq.action);
8,445✔
698

699
  uDebug("alter table action:%d", vAlterTbReq.action);
8,445✔
700
  switch (vAlterTbReq.action) {
8,445✔
701
    case TSDB_ALTER_TABLE_ADD_COLUMN: 
1,380✔
702
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COLUMN_REF: {
703
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
1,380✔
704
      ADD_TO_JSON_NUMBER(json, "colType", vAlterTbReq.type);
1,380✔
705

706
      int32_t length = getLength(vAlterTbReq.type, vAlterTbReq.bytes, vAlterTbReq.typeMod);
1,380✔
707
      if (length > 0) {
1,380✔
708
        ADD_TO_JSON_NUMBER(json, "colLength", length);
1,380✔
709
      }
710

711
      if (vAlterTbReq.action == TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COLUMN_REF) {
1,380✔
712
        ADD_TO_JSON_STRING(json, "refDbName", vAlterTbReq.refDbName);
1,380✔
713
        ADD_TO_JSON_STRING(json, "refTbName", vAlterTbReq.refTbName);
1,380✔
714
        ADD_TO_JSON_STRING(json, "refColName", vAlterTbReq.refColName);
1,380✔
715
      }
716
      break;
1,380✔
717
    }
718
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: {
×
719
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
×
UNCOV
720
      ADD_TO_JSON_NUMBER(json, "colType", vAlterTbReq.type);
×
721

722
      int32_t length = getLength(vAlterTbReq.type, vAlterTbReq.bytes, vAlterTbReq.typeMod);
×
UNCOV
723
      if (length > 0) {
×
724
        ADD_TO_JSON_NUMBER(json, "colLength", length);
×
725
      }
726
      RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
×
UNCOV
727
      break;
×
728
    }
729
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
×
730
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
×
731
      break;
×
732
    }
733
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
×
UNCOV
734
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
×
735
      ADD_TO_JSON_NUMBER(json, "colType", vAlterTbReq.colModType);
×
UNCOV
736
      int32_t length = getLength(vAlterTbReq.colModType, vAlterTbReq.colModBytes, vAlterTbReq.typeMod);
×
737
      if (length > 0) {
×
738
        ADD_TO_JSON_NUMBER(json, "colLength", length);
×
739
      }
740
      break;
×
741
    }
742
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
×
743
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
×
744
      ADD_TO_JSON_STRING(json, "colNewName", vAlterTbReq.colNewName);
×
745
      break;
×
746
    }
747
    case TSDB_ALTER_TABLE_UPDATE_MULTI_TABLE_TAG_VAL: {
2,929✔
748
      int32_t nTables = taosArrayGetSize(vAlterTbReq.tables);
2,929✔
749
      if (nTables <= 0) {
2,929✔
750
        code = TSDB_CODE_INVALID_PARA;
×
751
        uError("processAlterTable parse multi tables error");
×
UNCOV
752
        goto end;
×
753
      }
754

755
      cJSON* tables = cJSON_AddArrayToObject(json, "tables");
2,929✔
756
      RAW_NULL_CHECK(tables);
2,929✔
757

758
      for (int32_t i = 0; i < nTables; i++) {
6,598✔
759
        SUpdateTableTagVal* pTable = taosArrayGet(vAlterTbReq.tables, i);
3,669✔
760
        cJSON* tableObj = tmqAddObjectToArray(tables);
3,669✔
761
        RAW_NULL_CHECK(tableObj);
3,669✔
762

763
        ADD_TO_JSON_STRING(tableObj, "tableName", pTable->tbName);
3,669✔
764

765
        int32_t nTags = taosArrayGetSize(pTable->tags);
3,669✔
766
        cJSON* tags = cJSON_AddArrayToObject(tableObj, "tags");
3,669✔
767
        RAW_NULL_CHECK(tags);
3,669✔
768

769
        for (int32_t j = 0; j < nTags; j++) {
8,448✔
770
          cJSON* member = tmqAddObjectToArray(tags);
4,779✔
771
          RAW_NULL_CHECK(member);
4,779✔
772

773
          SUpdatedTagVal* pTagVal = taosArrayGet(pTable->tags, j);
4,779✔
774
          ADD_TO_JSON_STRING(member, "colName", pTagVal->tagName);
4,779✔
775

776
          if (pTagVal->regexp != NULL) {
4,779✔
777
            ADD_TO_JSON_STRING(member, "regexp", pTagVal->regexp);
×
778
            ADD_TO_JSON_STRING(member, "replacement", pTagVal->replacement);
×
779
          } else {
780
            bool isNull = pTagVal->isNull;
4,779✔
781
            if (!isNull) {
4,779✔
782
              int64_t bufSize = 0;
4,779✔
783
              if (pTagVal->tagType == TSDB_DATA_TYPE_VARBINARY) {
4,779✔
784
                bufSize = pTagVal->nTagVal * 2 + 2 + 3;
×
785
              } else {
786
                bufSize = pTagVal->nTagVal + 3;
4,779✔
787
              }
788
              buf1 = taosMemoryCalloc(bufSize, 1);
4,779✔
789
              RAW_NULL_CHECK(buf1);
4,779✔
790
              code = dataConverToStr(buf1, bufSize, pTagVal->tagType, pTagVal->pTagVal, pTagVal->nTagVal, NULL);
4,779✔
791
              if (code != TSDB_CODE_SUCCESS) {
4,779✔
UNCOV
792
                uError("convert tag value to string failed");
×
793
                goto end;
×
794
              }
795
              ADD_TO_JSON_STRING(member, "colValue", buf1);
4,779✔
796
              taosMemoryFreeClear(buf1);
4,779✔
797
            }
798
            ADD_TO_JSON_BOOL(member, "colValueNull", isNull);
4,779✔
799
          }
800
        }
801
      }
802
      break;
2,929✔
803
    }
804

805
    case TSDB_ALTER_TABLE_UPDATE_CHILD_TABLE_TAG_VAL: {
740✔
806
      int32_t nTags = taosArrayGetSize(vAlterTbReq.pMultiTag);
740✔
807
      if (nTags <= 0) {
740✔
808
        code = TSDB_CODE_INVALID_PARA;
×
809
        uError("processAlterTable parse multi tags error");
×
UNCOV
810
        goto end;
×
811
      }
812

813
      cJSON* tags = cJSON_AddArrayToObject(json, "tags");
740✔
814
      RAW_NULL_CHECK(tags);
740✔
815

816
      for (int32_t i = 0; i < nTags; i++) {
1,480✔
817
        cJSON* member = tmqAddObjectToArray(tags);
740✔
818
        RAW_NULL_CHECK(member);
740✔
819

820
        SUpdatedTagVal* pTagVal = taosArrayGet(vAlterTbReq.pMultiTag, i);
740✔
821
        ADD_TO_JSON_STRING(member, "colName", pTagVal->tagName);
740✔
822

823
        if (pTagVal->regexp != NULL) {
740✔
824
          ADD_TO_JSON_STRING(member, "regexp", pTagVal->regexp);
370✔
825
          ADD_TO_JSON_STRING(member, "replacement", pTagVal->replacement);
370✔
826
        } else {
827
          bool isNull = pTagVal->isNull;
370✔
828
          if (!isNull) {
370✔
829
            int64_t bufSize = 0;
370✔
830
            if (pTagVal->tagType == TSDB_DATA_TYPE_VARBINARY) {
370✔
831
              bufSize = pTagVal->nTagVal * 2 + 2 + 3;
×
832
            } else {
833
              bufSize = pTagVal->nTagVal + 3;
370✔
834
            }
835
            buf1 = taosMemoryCalloc(bufSize, 1);
370✔
836
            RAW_NULL_CHECK(buf1);
370✔
837
            code = dataConverToStr(buf1, bufSize, pTagVal->tagType, pTagVal->pTagVal, pTagVal->nTagVal, NULL);
370✔
838
            if (code != TSDB_CODE_SUCCESS) {
370✔
UNCOV
839
              uError("convert tag value to string failed");
×
840
              goto end;
×
841
            }
842
            ADD_TO_JSON_STRING(member, "colValue", buf1);
370✔
843
            taosMemoryFreeClear(buf1);
370✔
844
          }
845
          ADD_TO_JSON_BOOL(member, "colValueNull", isNull);
370✔
846
        }
847
      }
848

849
      if (vAlterTbReq.whereLen > 0) {
740✔
850
        buf1 = taosMemoryCalloc(vAlterTbReq.whereLen, 1);
740✔
851
        RAW_NULL_CHECK(buf1);
740✔
852
        buf2 = taosMemoryCalloc(vAlterTbReq.whereLen, 1);
740✔
853
        RAW_NULL_CHECK(buf2);
740✔
854
        memcpy(buf2, vAlterTbReq.where, vAlterTbReq.whereLen);
740✔
855
        RAW_RETURN_CHECK(nodesMsgToNode(buf2, vAlterTbReq.whereLen, &pWhere));
740✔
856
        int32_t tlen = 0;
740✔
857
        RAW_RETURN_CHECK(nodesNodeToSQL(pWhere, buf1, vAlterTbReq.whereLen, &tlen));
740✔
858
        if (tlen >= 1) buf1[tlen - 1] = 0;
740✔
859
        ADD_TO_JSON_STRING(json, "where", buf1 + 1);
740✔
860
        taosMemoryFreeClear(buf1);
740✔
861
        taosMemoryFreeClear(buf2);
740✔
862
      }
863
      break;
740✔
864
    }
865

UNCOV
866
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
×
UNCOV
867
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
×
UNCOV
868
      RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
×
UNCOV
869
      break;
×
870
    }
871
    case TSDB_ALTER_TABLE_ALTER_COLUMN_REF: {
1,698✔
872
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
1,698✔
873
      ADD_TO_JSON_STRING(json, "refDbName", vAlterTbReq.refDbName);
1,698✔
874
      ADD_TO_JSON_STRING(json, "refTbName", vAlterTbReq.refTbName);
1,698✔
875
      ADD_TO_JSON_STRING(json, "refColName", vAlterTbReq.refColName);
1,698✔
876
      break;
1,698✔
877
    }
878
    case TSDB_ALTER_TABLE_REMOVE_COLUMN_REF:{
1,698✔
879
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
1,698✔
880
      break;
1,698✔
881
    }
UNCOV
882
    default:
×
UNCOV
883
      break;
×
884
  }
885

886
end:
8,445✔
887
  nodesDestroyNode(pWhere);
8,445✔
888
  destroyAlterTbReq(&vAlterTbReq);
8,445✔
889
  tDecoderClear(&decoder);
8,445✔
890
  taosMemoryFree(buf);
8,445✔
891
  taosMemoryFree(buf1);
8,445✔
892
  taosMemoryFree(buf2);
8,445✔
893
  *pJson = json;
8,445✔
894
  RAW_LOG_END
8,445✔
895
  return code;
8,445✔
896
}
897

UNCOV
898
static int32_t processDropSTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
×
UNCOV
899
  if (pJson == NULL || metaRsp == NULL) {
×
UNCOV
900
    uError("invalid parameter in %s", __func__);
×
UNCOV
901
    return TSDB_CODE_INVALID_PARA;
×
902
  }
903
  SDecoder     decoder = {0};
×
904
  SVDropStbReq req = {0};
×
905
  cJSON*       json = NULL;
×
UNCOV
906
  int32_t      code = 0;
×
907
  int32_t      lino = 0;
×
908
  RAW_LOG_START
×
909

910
  // decode
911
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
×
912
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
×
UNCOV
913
  tDecoderInit(&decoder, data, len);
×
UNCOV
914
  RAW_RETURN_CHECK(tDecodeSVDropStbReq(&decoder, &req));
×
915

916
  json = cJSON_CreateObject();
×
917
  RAW_NULL_CHECK(json);
×
918
  ADD_TO_JSON_STRING(json, "type", "drop");
×
UNCOV
919
  ADD_TO_JSON_STRING(json, "tableType", "super");
×
920
  ADD_TO_JSON_STRING(json, "tableName", req.name);
×
921

922
end:
×
923
  tDecoderClear(&decoder);
×
924
  *pJson = json;
×
UNCOV
925
  RAW_LOG_END
×
926
  return code;
×
927
}
928
static int32_t processDeleteTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
×
929
  if (pJson == NULL || metaRsp == NULL) {
×
930
    uError("invalid parameter in %s", __func__);
×
UNCOV
931
    return TSDB_CODE_INVALID_PARA;
×
932
  }
933
  SDeleteRes req = {0};
×
934
  SDecoder   coder = {0};
×
935
  cJSON*     json = NULL;
×
UNCOV
936
  int32_t    code = 0;
×
937
  int32_t    lino = 0;
×
938
  RAW_LOG_START
×
939

940
  // decode and process req
941
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
×
942
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
×
943

UNCOV
944
  tDecoderInit(&coder, data, len);
×
945
  RAW_RETURN_CHECK(tDecodeDeleteRes(&coder, &req));
×
946
  //  getTbName(req.tableFName);
UNCOV
947
  char sql[256] = {0};
×
948
  (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
×
949
                 req.tsColName, req.skey, req.tsColName, req.ekey);
950

951
  json = cJSON_CreateObject();
×
952
  RAW_NULL_CHECK(json);
×
UNCOV
953
  ADD_TO_JSON_STRING(json, "type", "delete");
×
UNCOV
954
  ADD_TO_JSON_STRING(json, "sql", sql);
×
955

956
end:
×
957
  tDecoderClear(&coder);
×
958
  *pJson = json;
×
UNCOV
959
  RAW_LOG_END
×
960
  return code;
×
961
}
962

963
static int32_t processDropTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
×
964
  if (pJson == NULL || metaRsp == NULL) {
×
UNCOV
965
    uError("invalid parameter in %s", __func__);
×
UNCOV
966
    return TSDB_CODE_INVALID_PARA;
×
967
  }
968
  SDecoder         decoder = {0};
×
969
  SVDropTbBatchReq req = {0};
×
970
  cJSON*           json = NULL;
×
UNCOV
971
  int32_t          code = 0;
×
972
  int32_t          lino = 0;
×
973
  RAW_LOG_START
×
974
  // decode
975
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
×
976
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
×
977
  tDecoderInit(&decoder, data, len);
×
UNCOV
978
  RAW_RETURN_CHECK(tDecodeSVDropTbBatchReq(&decoder, &req));
×
979

980
  json = cJSON_CreateObject();
×
981
  RAW_NULL_CHECK(json);
×
982
  ADD_TO_JSON_STRING(json, "type", "drop");
×
983

984
  cJSON* tableNameList = cJSON_AddArrayToObject(json, "tableNameList");
×
985
  RAW_NULL_CHECK(tableNameList);
×
986

UNCOV
987
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
988
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
×
989
    RAW_NULL_CHECK(tmqAddStringToArray(tableNameList, pDropTbReq->name));
×
990
  }
991

992
end:
×
993
  tDecoderClear(&decoder);
×
UNCOV
994
  *pJson = json;
×
UNCOV
995
  RAW_LOG_END
×
996
  return code;
×
997
}
998

999
static int32_t taosCreateStb(TAOS* taos, void* meta, uint32_t metaLen) {
5,143✔
1000
  if (taos == NULL || meta == NULL) {
5,143✔
UNCOV
1001
    uError("invalid parameter in %s", __func__);
×
UNCOV
1002
    return TSDB_CODE_INVALID_PARA;
×
1003
  }
1004
  SVCreateStbReq req = {0};
5,143✔
1005
  SDecoder       coder = {0};
5,143✔
1006
  SMCreateStbReq pReq = {0};
5,143✔
1007
  int32_t        code = TSDB_CODE_SUCCESS;
5,143✔
1008
  int32_t        lino = 0;
5,143✔
1009
  SRequestObj*   pRequest = NULL;
5,143✔
1010
  SCmdMsgInfo    pCmdMsg = {0};
5,143✔
1011
  RAW_LOG_START
5,143✔
1012

1013
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
5,143✔
1014
  uDebug(LOG_ID_TAG " create stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
5,143✔
1015
  pRequest->syncQuery = true;
5,143✔
1016
  if (!pRequest->pDb) {
5,143✔
UNCOV
1017
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
1018
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
UNCOV
1019
    goto end;
×
1020
  }
1021
  // decode and process req
1022
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
5,143✔
1023
  uint32_t len = metaLen - sizeof(SMsgHead);
5,143✔
1024
  tDecoderInit(&coder, data, len);
5,143✔
1025
  RAW_RETURN_CHECK(tDecodeSVCreateStbReq(&coder, &req));
5,143✔
1026

1027
  int8_t           createDefaultCompress = 0;
5,143✔
1028
  SColCmprWrapper* p = &req.colCmpr;
5,143✔
1029
  if (p->nCols == 0) {
5,143✔
UNCOV
1030
    createDefaultCompress = 1;
×
1031
  }
1032
  // build create stable
1033
  pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SFieldWithOptions));
5,143✔
1034
  RAW_NULL_CHECK(pReq.pColumns);
5,143✔
1035
  for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
77,183✔
1036
    SSchema*          pSchema = req.schemaRow.pSchema + i;
72,040✔
1037
    SFieldWithOptions field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
72,040✔
1038
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
72,040✔
1039

1040
    if (createDefaultCompress) {
72,040✔
UNCOV
1041
      field.compress = createDefaultColCmprByType(pSchema->type);
×
1042
    } else {
1043
      SColCmpr* pCmp = &req.colCmpr.pColCmpr[i];
72,040✔
1044
      field.compress = pCmp->alg;
72,040✔
1045
    }
1046
    if (req.pExtSchemas) field.typeMod = req.pExtSchemas[i].typeMod;
72,040✔
1047
    RAW_NULL_CHECK(taosArrayPush(pReq.pColumns, &field));
144,080✔
1048
  }
1049
  pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
5,143✔
1050
  RAW_NULL_CHECK(pReq.pTags);
5,143✔
1051
  for (int32_t i = 0; i < req.schemaTag.nCols; i++) {
11,760✔
1052
    SSchema* pSchema = req.schemaTag.pSchema + i;
6,617✔
1053
    SField   field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
6,617✔
1054
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
6,617✔
1055
    RAW_NULL_CHECK(taosArrayPush(pReq.pTags, &field));
13,234✔
1056
  }
1057

1058
  pReq.colVer = req.schemaRow.version;
5,143✔
1059
  pReq.tagVer = req.schemaTag.version;
5,143✔
1060
  pReq.numOfColumns = req.schemaRow.nCols;
5,143✔
1061
  pReq.numOfTags = req.schemaTag.nCols;
5,143✔
1062
  pReq.commentLen = -1;
5,143✔
1063
  pReq.suid = processSuid(req.suid, pRequest->pDb);
5,143✔
1064
  pReq.source = TD_REQ_FROM_TAOX;
5,143✔
1065
  pReq.igExists = true;
5,143✔
1066
  pReq.virtualStb = req.virtualStb;
5,143✔
1067

1068
  uDebug(LOG_ID_TAG " create stable name:%s suid:%" PRId64 " processSuid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
5,143✔
1069
         pReq.suid);
1070
  STscObj* pTscObj = pRequest->pTscObj;
5,143✔
1071
  SName    tableName = {0};
5,143✔
1072
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
5,143✔
1073
  RAW_RETURN_CHECK(tNameExtractFullName(&tableName, pReq.name));
5,143✔
1074
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
5,143✔
1075
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
5,143✔
1076
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
5,143✔
1077
  RAW_FALSE_CHECK(pCmdMsg.msgLen > 0);
5,143✔
1078
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
5,143✔
1079
  RAW_NULL_CHECK(pCmdMsg.pMsg);
5,143✔
1080
  RAW_FALSE_CHECK(tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) > 0);
5,143✔
1081

1082
  SQuery pQuery = {0};
5,143✔
1083
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
5,143✔
1084
  pQuery.pCmdMsg = &pCmdMsg;
5,143✔
1085
  pQuery.msgType = pQuery.pCmdMsg->msgType;
5,143✔
1086
  pQuery.stableQuery = true;
5,143✔
1087

1088
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
5,143✔
1089

1090
  if (pRequest->code == TSDB_CODE_SUCCESS) {
5,143✔
1091
    SCatalog* pCatalog = NULL;
5,143✔
1092
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
5,143✔
1093
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
5,143✔
1094
  }
1095

1096
  code = pRequest->code;
5,143✔
1097
  uDebug(LOG_ID_TAG " create stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
5,143✔
1098

1099
end:
5,143✔
1100
  destroyRequest(pRequest);
5,143✔
1101
  tFreeSMCreateStbReq(&pReq);
5,143✔
1102
  tDecoderClear(&coder);
5,143✔
1103
  taosMemoryFree(pCmdMsg.pMsg);
5,143✔
1104
  RAW_LOG_END
5,143✔
1105
  return code;
5,143✔
1106
}
1107

UNCOV
1108
static int32_t taosDropStb(TAOS* taos, void* meta, uint32_t metaLen) {
×
UNCOV
1109
  if (taos == NULL || meta == NULL) {
×
UNCOV
1110
    uError("invalid parameter in %s", __func__);
×
UNCOV
1111
    return TSDB_CODE_INVALID_PARA;
×
1112
  }
1113
  SVDropStbReq req = {0};
×
1114
  SDecoder     coder = {0};
×
1115
  SMDropStbReq pReq = {0};
×
UNCOV
1116
  int32_t      code = TSDB_CODE_SUCCESS;
×
1117
  int32_t      lino = 0;
×
1118
  SRequestObj* pRequest = NULL;
×
1119
  SCmdMsgInfo  pCmdMsg = {0};
×
1120

1121
  RAW_LOG_START
×
1122
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
×
1123
  uDebug(LOG_ID_TAG " drop stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
×
UNCOV
1124
  pRequest->syncQuery = true;
×
1125
  if (!pRequest->pDb) {
×
1126
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
1127
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1128
    goto end;
×
1129
  }
1130
  // decode and process req
1131
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
×
1132
  uint32_t len = metaLen - sizeof(SMsgHead);
×
UNCOV
1133
  tDecoderInit(&coder, data, len);
×
UNCOV
1134
  RAW_RETURN_CHECK(tDecodeSVDropStbReq(&coder, &req));
×
1135
  SCatalog* pCatalog = NULL;
×
1136
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
×
1137
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
×
1138
                           .requestId = pRequest->requestId,
×
1139
                           .requestObjRefId = pRequest->self,
×
1140
                           .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
×
1141
  SName            pName = {0};
×
1142
  toName(pRequest->pTscObj->acctId, pRequest->pDb, req.name, &pName);
×
1143
  STableMeta* pTableMeta = NULL;
×
1144
  code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
×
1145
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
×
1146
    uInfo(LOG_ID_TAG " stable %s not exist, ignore drop", LOG_ID_VALUE, req.name);
×
1147
    code = TSDB_CODE_SUCCESS;
×
1148
    taosMemoryFreeClear(pTableMeta);
×
1149
    goto end;
×
1150
  }
1151
  RAW_RETURN_CHECK(code);
×
1152
  pReq.suid = pTableMeta->uid;
×
1153
  taosMemoryFreeClear(pTableMeta);
×
1154

1155
  // build drop stable
1156
  pReq.igNotExists = true;
×
1157
  pReq.source = TD_REQ_FROM_TAOX;
×
1158
  //  pReq.suid = processSuid(req.suid, pRequest->pDb);
1159

1160
  uDebug(LOG_ID_TAG " drop stable name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
×
1161
         pReq.suid);
UNCOV
1162
  STscObj* pTscObj = pRequest->pTscObj;
×
UNCOV
1163
  SName    tableName = {0};
×
1164
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
×
UNCOV
1165
  RAW_RETURN_CHECK(tNameExtractFullName(&tableName, pReq.name));
×
1166

1167
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
×
1168
  pCmdMsg.msgType = TDMT_MND_DROP_STB;
×
1169
  pCmdMsg.msgLen = tSerializeSMDropStbReq(NULL, 0, &pReq);
×
UNCOV
1170
  RAW_FALSE_CHECK(pCmdMsg.msgLen > 0);
×
1171
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
×
1172
  RAW_NULL_CHECK(pCmdMsg.pMsg);
×
1173
  RAW_FALSE_CHECK(tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) > 0);
×
1174

1175
  SQuery pQuery = {0};
×
1176
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
×
1177
  pQuery.pCmdMsg = &pCmdMsg;
×
UNCOV
1178
  pQuery.msgType = pQuery.pCmdMsg->msgType;
×
1179
  pQuery.stableQuery = true;
×
1180

1181
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
×
1182
  if (pRequest->code == TSDB_CODE_SUCCESS) {
×
1183
    // ignore the error code
UNCOV
1184
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
×
1185
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
×
1186
  }
1187

1188
  code = pRequest->code;
×
1189
  uDebug(LOG_ID_TAG " drop stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
×
1190

UNCOV
1191
end:
×
1192
  RAW_LOG_END
×
1193
  destroyRequest(pRequest);
×
UNCOV
1194
  tDecoderClear(&coder);
×
1195
  return code;
×
1196
}
1197

1198
typedef struct SVgroupCreateTableBatch {
1199
  SVCreateTbBatchReq req;
1200
  SVgroupInfo        info;
1201
  char               dbName[TSDB_DB_NAME_LEN];
1202
} SVgroupCreateTableBatch;
1203

1204
static void destroyCreateTbReqBatch(void* data) {
14,314✔
1205
  if (data == NULL) {
14,314✔
UNCOV
1206
    uError("invalid parameter in %s", __func__);
×
UNCOV
1207
    return;
×
1208
  }
1209
  SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data;
14,314✔
1210
  taosArrayDestroy(pTbBatch->req.pArray);
14,314✔
1211
}
1212

1213
static const SSchema* getNormalColSchema(const STableMeta* pTableMeta, const char* pColName) {
5,672✔
1214
  for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) {
16,393✔
1215
    const SSchema* pSchema = pTableMeta->schema + i;
16,393✔
1216
    if (0 == strcmp(pColName, pSchema->name)) {
16,393✔
1217
      return pSchema;
5,672✔
1218
    }
1219
  }
UNCOV
1220
  return NULL;
×
1221
}
1222

1223
static STableMeta* getTableMeta(SCatalog* pCatalog, SRequestConnInfo* conn, char* dbName, char* tbName, int32_t acctId){
5,672✔
1224
  SName       sName = {0};
5,672✔
1225
  toName(acctId, dbName, tbName, &sName);
5,672✔
1226
  STableMeta* pTableMeta = NULL;
5,672✔
1227
  int32_t code = catalogGetTableMeta(pCatalog, conn, &sName, &pTableMeta);
5,672✔
1228
  if (code != 0) {
5,672✔
UNCOV
1229
    uError("failed to get table meta for reference table:%s.%s", dbName, tbName);
×
UNCOV
1230
    taosMemoryFreeClear(pTableMeta);
×
UNCOV
1231
    terrno = code;
×
UNCOV
1232
    return NULL;
×
1233
  }
1234
  return pTableMeta;
5,672✔
1235
}
1236

1237
static int32_t checkColRef(STableMeta* pTableMeta, char* colName, uint8_t precision, const SSchema* pSchema) {
5,354✔
1238
  int32_t code = TSDB_CODE_SUCCESS;
5,354✔
1239
  if (pTableMeta->tableInfo.precision != precision) {
5,354✔
UNCOV
1240
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN_TYPE;
×
UNCOV
1241
    uError("timestamp precision of virtual table and its reference table do not match");
×
UNCOV
1242
    goto end;
×
1243
  }
1244
  // org table cannot has composite primary key
1245
  if (pTableMeta->tableInfo.numOfColumns > 1 && pTableMeta->schema[1].flags & COL_IS_KEY) {
5,354✔
1246
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
UNCOV
1247
    uError("virtual table's column:\"%s\"'s reference can not from table with composite key", colName);
×
UNCOV
1248
    goto end;
×
1249
  }
1250

1251
  // org table must be child table or normal table
1252
  if (pTableMeta->tableType != TSDB_NORMAL_TABLE && pTableMeta->tableType != TSDB_CHILD_TABLE) {
5,354✔
UNCOV
1253
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
UNCOV
1254
    uError("virtual table's column:\"%s\"'s reference can only be normal table or child table", colName);
×
UNCOV
1255
    goto end;
×
1256
  }
1257

1258
  const SSchema* pRefCol = getNormalColSchema(pTableMeta, colName);
5,354✔
1259
  if (NULL == pRefCol) {
5,354✔
UNCOV
1260
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
UNCOV
1261
    uError("virtual table's column:\"%s\"'s reference column:\"%s\" not exist", pSchema->name, colName);
×
UNCOV
1262
    goto end;
×
1263
  }
1264

1265
  if (pRefCol->type != pSchema->type || pRefCol->bytes != pSchema->bytes) {
5,354✔
1266
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN_TYPE;
×
UNCOV
1267
    uError("virtual table's column:\"%s\"'s type and reference column:\"%s\"'s type not match, %d %d %d %d",
×
1268
            pSchema->name, colName, pSchema->type, pSchema->bytes, pRefCol->type, pRefCol->bytes);
UNCOV
1269
    goto end;
×
1270
  }
1271

1272
end:
5,354✔
1273
  return code;
5,354✔
1274
}
1275

1276
static int32_t checkColRefForCreate(SCatalog* pCatalog, SRequestConnInfo* conn, SColRef* pColRef, int32_t acctId, uint8_t precision, SSchema* pSchema) {
5,036✔
1277
  STableMeta* pTableMeta = getTableMeta(pCatalog, conn, pColRef->refDbName, pColRef->refTableName, acctId);
5,036✔
1278
  if (pTableMeta == NULL) {
5,036✔
UNCOV
1279
      return terrno;
×
1280
  }
1281
  int32_t code = checkColRef(pTableMeta, pColRef->refColName, precision, pSchema);
5,036✔
1282
  taosMemoryFreeClear(pTableMeta);
5,036✔
1283
  return code;
5,036✔
1284
}
1285

UNCOV
1286
static int32_t checkColRefForAdd(SCatalog* pCatalog, SRequestConnInfo* conn, int32_t acctId, char* dbName, char* tbName, char* colName, 
×
1287
  char* dbNameSrc, char* tbNameSrc, char* colNameSrc, int8_t type, int32_t bytes) {
UNCOV
1288
  int32_t code = 0;
×
UNCOV
1289
  STableMeta* pTableMeta = getTableMeta(pCatalog, conn, dbName, tbName, acctId);
×
1290
  if (pTableMeta == NULL) {
×
UNCOV
1291
    code = terrno;
×
1292
    goto end;
×
1293
  }
1294
  STableMeta* pTableMetaSrc = getTableMeta(pCatalog, conn, dbNameSrc, tbNameSrc, acctId);
×
1295
  if (pTableMetaSrc == NULL) {
×
1296
    code = terrno;
×
UNCOV
1297
    goto end;
×
1298
  }
1299

1300
  SSchema pSchema = {.type = type, .bytes = bytes};
×
1301
  tstrncpy(pSchema.name, colNameSrc, TSDB_COL_NAME_LEN);
×
UNCOV
1302
  code = checkColRef(pTableMeta, colName, pTableMetaSrc->tableInfo.precision, &pSchema);
×
1303

1304
end:
×
1305
  taosMemoryFreeClear(pTableMeta);
×
1306
  taosMemoryFreeClear(pTableMetaSrc);
×
UNCOV
1307
  return code;
×
1308
}
1309

1310
static int32_t checkColRefForAlter(SCatalog* pCatalog, SRequestConnInfo* conn, int32_t acctId, char* dbName, char* tbName, char* colName, 
318✔
1311
  char* dbNameSrc, char* tbNameSrc, char* colNameSrc) {
1312
  int32_t code = 0;
318✔
1313
  STableMeta* pTableMeta = getTableMeta(pCatalog, conn, dbName, tbName, acctId);
318✔
1314
  if (pTableMeta == NULL) {
318✔
UNCOV
1315
    code = terrno;
×
UNCOV
1316
    goto end;
×
1317
  }
1318
  STableMeta* pTableMetaSrc = getTableMeta(pCatalog, conn, dbNameSrc, tbNameSrc, acctId);
318✔
1319
  if (pTableMetaSrc == NULL) {
318✔
1320
    code = terrno;
×
UNCOV
1321
    goto end;
×
1322
  }
1323
  const SSchema* pSchema = getNormalColSchema(pTableMetaSrc, colNameSrc);
318✔
1324
  if (NULL == pSchema) {
318✔
1325
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
UNCOV
1326
    uError("virtual table's column:\"%s\" not exist", colNameSrc);
×
UNCOV
1327
    goto end;
×
1328
  }
1329

1330
  code = checkColRef(pTableMeta, colName, pTableMetaSrc->tableInfo.precision, pSchema);
318✔
1331

1332
end:
318✔
1333
  taosMemoryFreeClear(pTableMeta);
318✔
1334
  taosMemoryFreeClear(pTableMetaSrc);
318✔
1335
  return code;
318✔
1336
}
1337

1338
static int32_t taosCreateTable(TAOS* taos, void* meta, uint32_t metaLen) {
14,314✔
1339
  if (taos == NULL || meta == NULL) {
14,314✔
UNCOV
1340
    uError("invalid parameter in %s", __func__);
×
UNCOV
1341
    return TSDB_CODE_INVALID_PARA;
×
1342
  }
1343
  SVCreateTbBatchReq req = {0};
14,314✔
1344
  SDecoder           coder = {0};
14,314✔
1345
  int32_t            code = TSDB_CODE_SUCCESS;
14,314✔
1346
  int32_t            lino = 0;
14,314✔
1347
  SRequestObj*       pRequest = NULL;
14,314✔
1348
  SQuery*            pQuery = NULL;
14,314✔
1349
  SHashObj*          pVgroupHashmap = NULL;
14,314✔
1350

1351
  RAW_LOG_START
14,314✔
1352
  SArray* pTagList = taosArrayInit(0, POINTER_BYTES);
14,314✔
1353
  RAW_NULL_CHECK(pTagList);
14,314✔
1354
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
14,314✔
1355
  uDebug(LOG_ID_TAG " create table, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
14,314✔
1356

1357
  pRequest->syncQuery = true;
14,314✔
1358
  if (!pRequest->pDb) {
14,314✔
UNCOV
1359
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
UNCOV
1360
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
1361
    goto end;
×
1362
  }
1363
  // decode and process req
1364
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
14,314✔
1365
  uint32_t len = metaLen - sizeof(SMsgHead);
14,314✔
1366
  tDecoderInit(&coder, data, len);
14,314✔
1367
  RAW_RETURN_CHECK(tDecodeSVCreateTbBatchReq(&coder, &req));
14,314✔
1368
  STscObj* pTscObj = pRequest->pTscObj;
14,314✔
1369

1370
  SVCreateTbReq* pCreateReq = NULL;
14,314✔
1371
  SCatalog*      pCatalog = NULL;
14,314✔
1372
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
14,314✔
1373
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
14,314✔
1374
  RAW_NULL_CHECK(pVgroupHashmap);
14,314✔
1375
  taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
14,314✔
1376

1377
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
14,314✔
1378
                           .requestId = pRequest->requestId,
14,314✔
1379
                           .requestObjRefId = pRequest->self,
14,314✔
1380
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
14,314✔
1381

1382
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
14,314✔
1383
  RAW_NULL_CHECK(pRequest->tableList);
14,314✔
1384
  // loop to create table
1385
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
28,628✔
1386
    pCreateReq = req.pReqs + iReq;
14,314✔
1387

1388
    SVgroupInfo pInfo = {0};
14,314✔
1389
    SName       pName = {0};
14,314✔
1390
    toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
14,314✔
1391
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
14,314✔
1392

1393
    pCreateReq->flags |= TD_CREATE_IF_NOT_EXISTS;
14,314✔
1394
    // change tag cid to new cid
1395
    if (pCreateReq->type == TSDB_CHILD_TABLE || pCreateReq->type == TSDB_VIRTUAL_CHILD_TABLE) {
14,314✔
1396
      STableMeta* pTableMeta = NULL;
9,670✔
1397
      SName       sName = {0};
9,670✔
1398
      tb_uid_t    oldSuid = pCreateReq->ctb.suid;
9,670✔
1399
      //      pCreateReq->ctb.suid = processSuid(pCreateReq->ctb.suid, pRequest->pDb);
1400
      toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName);
9,670✔
1401
      code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta);
9,670✔
1402
      if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
9,670✔
UNCOV
1403
        uInfo(LOG_ID_TAG " super table %s not exist, ignore create child table %s", LOG_ID_VALUE,
×
1404
              pCreateReq->ctb.stbName, pCreateReq->name);
UNCOV
1405
        code = TSDB_CODE_SUCCESS;
×
UNCOV
1406
        taosMemoryFreeClear(pTableMeta);
×
1407
        continue;
×
1408
      }
1409

1410
      RAW_RETURN_CHECK(code);
9,670✔
1411
      pCreateReq->ctb.suid = pTableMeta->uid;
9,670✔
1412

1413
      bool changeDB = strlen(tmqWriteRefDB) > 0;
9,670✔
1414
      for (int32_t i = 0; changeDB && i < pCreateReq->colRef.nCols; i++) {
25,262✔
1415
        SColRef* pColRef = pCreateReq->colRef.pColRef + i;
15,592✔
1416
        tstrncpy(pColRef->refDbName, tmqWriteRefDB, TSDB_DB_NAME_LEN);
15,592✔
1417
      }
1418

1419
      for (int32_t i = 0; tmqWriteCheckRef && i < pCreateReq->colRef.nCols && i < pTableMeta->tableInfo.numOfColumns; i++) {
19,742✔
1420
        SColRef* pColRef = pCreateReq->colRef.pColRef + i;
10,072✔
1421
        if (!pColRef || !pColRef->hasRef) continue;
10,072✔
1422
        SSchema* pSchema = pTableMeta->schema + i;
5,036✔
1423
        RAW_RETURN_CHECK(checkColRefForCreate(pCatalog, &conn, pColRef, pTscObj->acctId, pTableMeta->tableInfo.precision, pSchema));
5,036✔
1424
      }
1425
      
1426
      SArray* pTagVals = NULL;
9,670✔
1427
      code = tTagToValArray((STag*)pCreateReq->ctb.pTag, &pTagVals);
9,670✔
1428
      if (code != TSDB_CODE_SUCCESS) {
9,670✔
UNCOV
1429
        uError("create tb invalid tag data %s", pCreateReq->name);
×
UNCOV
1430
        taosMemoryFreeClear(pTableMeta);
×
UNCOV
1431
        goto end;
×
1432
      }
1433

1434
      bool rebuildTag = false;
9,670✔
1435
      for (int32_t i = 0; i < taosArrayGetSize(pCreateReq->ctb.tagName); i++) {
23,022✔
1436
        char* tName = taosArrayGet(pCreateReq->ctb.tagName, i);
13,352✔
1437
        if (tName == NULL) {
13,352✔
UNCOV
1438
          continue;
×
1439
        }
1440
        for (int32_t j = pTableMeta->tableInfo.numOfColumns;
13,352✔
1441
             j < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; j++) {
35,172✔
1442
          SSchema* tag = &pTableMeta->schema[j];
21,820✔
1443
          if (strcmp(tag->name, tName) == 0 && tag->type != TSDB_DATA_TYPE_JSON) {
21,820✔
1444
            STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
13,352✔
1445
            if (pTagVal) {
13,352✔
1446
              if (pTagVal->cid != tag->colId) {
13,352✔
1447
                pTagVal->cid = tag->colId;
358✔
1448
                rebuildTag = true;
358✔
1449
              }
1450
            } else {
UNCOV
1451
              uError("create tb invalid data %s, size:%d index:%d cid:%d", pCreateReq->name,
×
1452
                     (int)taosArrayGetSize(pTagVals), i, tag->colId);
1453
            }
1454
          }
1455
        }
1456
      }
1457
      taosMemoryFreeClear(pTableMeta);
9,670✔
1458
      if (rebuildTag) {
9,670✔
1459
        STag* ppTag = NULL;
358✔
1460
        code = tTagNew(pTagVals, 1, false, &ppTag);
358✔
1461
        taosArrayDestroy(pTagVals);
358✔
1462
        pTagVals = NULL;
358✔
1463
        if (code != TSDB_CODE_SUCCESS) {
358✔
UNCOV
1464
          goto end;
×
1465
        }
1466
        if (NULL == taosArrayPush(pTagList, &ppTag)) {
358✔
UNCOV
1467
          code = terrno;
×
1468
          tTagFree(ppTag);
×
UNCOV
1469
          goto end;
×
1470
        }
1471
        pCreateReq->ctb.pTag = (uint8_t*)ppTag;
358✔
1472
      }
1473
      taosArrayDestroy(pTagVals);
9,670✔
1474
    }
1475
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
28,628✔
1476

1477
    SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
14,314✔
1478
    if (pTableBatch == NULL) {
14,314✔
1479
      SVgroupCreateTableBatch tBatch = {0};
14,314✔
1480
      tBatch.info = pInfo;
14,314✔
1481
      tstrncpy(tBatch.dbName, pRequest->pDb, TSDB_DB_NAME_LEN);
14,314✔
1482

1483
      tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
14,314✔
1484
      RAW_NULL_CHECK(tBatch.req.pArray);
14,314✔
1485
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pCreateReq));
28,628✔
1486
      tBatch.req.source = TD_REQ_FROM_TAOX;
14,314✔
1487
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
14,314✔
1488
    } else {  // add to the correct vgroup
UNCOV
1489
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pCreateReq));
×
1490
    }
1491
  }
1492

1493
  if (taosHashGetSize(pVgroupHashmap) == 0) {
14,314✔
UNCOV
1494
    goto end;
×
1495
  }
1496
  SArray* pBufArray = NULL;
14,314✔
1497
  RAW_RETURN_CHECK(serializeVgroupsCreateTableBatch(pVgroupHashmap, &pBufArray));
14,314✔
1498
  pQuery = NULL;
14,314✔
1499
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
14,314✔
1500
  if (TSDB_CODE_SUCCESS != code) goto end;
14,314✔
1501
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
14,314✔
1502
  pQuery->msgType = TDMT_VND_CREATE_TABLE;
14,314✔
1503
  pQuery->stableQuery = false;
14,314✔
1504
  code = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT, &pQuery->pRoot);
14,314✔
1505
  if (TSDB_CODE_SUCCESS != code) goto end;
14,314✔
1506
  RAW_NULL_CHECK(pQuery->pRoot);
14,314✔
1507

1508
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
14,314✔
1509

1510
  launchQueryImpl(pRequest, pQuery, true, NULL);
14,314✔
1511
  if (pRequest->code == TSDB_CODE_SUCCESS) {
14,314✔
1512
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
14,314✔
1513
  }
1514

1515
  code = pRequest->code;
14,314✔
1516
  uDebug(LOG_ID_TAG " create table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
14,314✔
1517

1518
end:
14,314✔
1519
  tDeleteSVCreateTbBatchReq(&req);
14,314✔
1520

1521
  taosHashCleanup(pVgroupHashmap);
14,314✔
1522
  destroyRequest(pRequest);
14,314✔
1523
  tDecoderClear(&coder);
14,314✔
1524
  qDestroyQuery(pQuery);
14,314✔
1525
  taosArrayDestroyP(pTagList, NULL);
14,314✔
1526
  RAW_LOG_END
14,314✔
1527
  return code;
14,314✔
1528
}
1529

1530
typedef struct SVgroupDropTableBatch {
1531
  SVDropTbBatchReq req;
1532
  SVgroupInfo      info;
1533
  char             dbName[TSDB_DB_NAME_LEN];
1534
} SVgroupDropTableBatch;
1535

UNCOV
1536
static void destroyDropTbReqBatch(void* data) {
×
UNCOV
1537
  if (data == NULL) {
×
UNCOV
1538
    uError("invalid parameter in %s", __func__);
×
UNCOV
1539
    return;
×
1540
  }
1541
  SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
×
1542
  taosArrayDestroy(pTbBatch->req.pArray);
×
1543
}
1544

1545
static int32_t taosDropTable(TAOS* taos, void* meta, uint32_t metaLen) {
×
1546
  if (taos == NULL || meta == NULL) {
×
UNCOV
1547
    uError("invalid parameter in %s", __func__);
×
UNCOV
1548
    return TSDB_CODE_INVALID_PARA;
×
1549
  }
1550
  SVDropTbBatchReq req = {0};
×
1551
  SDecoder         coder = {0};
×
1552
  int32_t          code = TSDB_CODE_SUCCESS;
×
UNCOV
1553
  int32_t          lino = 0;
×
1554
  SRequestObj*     pRequest = NULL;
×
1555
  SQuery*          pQuery = NULL;
×
1556
  SHashObj*        pVgroupHashmap = NULL;
×
1557

1558
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
×
1559
  uDebug(LOG_ID_TAG " drop table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
×
1560

UNCOV
1561
  pRequest->syncQuery = true;
×
1562
  if (!pRequest->pDb) {
×
1563
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
UNCOV
1564
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1565
    goto end;
×
1566
  }
1567
  // decode and process req
1568
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
×
1569
  uint32_t len = metaLen - sizeof(SMsgHead);
×
UNCOV
1570
  tDecoderInit(&coder, data, len);
×
UNCOV
1571
  RAW_RETURN_CHECK(tDecodeSVDropTbBatchReq(&coder, &req));
×
1572
  STscObj* pTscObj = pRequest->pTscObj;
×
1573

1574
  SVDropTbReq* pDropReq = NULL;
×
1575
  SCatalog*    pCatalog = NULL;
×
1576
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
×
1577

1578
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
1579
  RAW_NULL_CHECK(pVgroupHashmap);
×
1580
  taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
×
1581

1582
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
×
1583
                           .requestId = pRequest->requestId,
×
1584
                           .requestObjRefId = pRequest->self,
×
UNCOV
1585
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
×
1586
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
×
1587
  RAW_NULL_CHECK(pRequest->tableList);
×
1588
  // loop to create table
1589
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
1590
    pDropReq = req.pReqs + iReq;
×
1591
    pDropReq->igNotExists = true;
×
1592
    //    pDropReq->suid = processSuid(pDropReq->suid, pRequest->pDb);
1593

1594
    SVgroupInfo pInfo = {0};
×
1595
    SName       pName = {0};
×
UNCOV
1596
    toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
×
UNCOV
1597
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
×
1598

1599
    STableMeta* pTableMeta = NULL;
×
1600
    code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
×
1601
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
×
UNCOV
1602
      code = TSDB_CODE_SUCCESS;
×
1603
      uInfo(LOG_ID_TAG " table %s not exist, ignore drop", LOG_ID_VALUE, pDropReq->name);
×
1604
      taosMemoryFreeClear(pTableMeta);
×
1605
      continue;
×
1606
    }
1607
    RAW_RETURN_CHECK(code);
×
1608
    tb_uid_t oldSuid = pDropReq->suid;
×
1609
    pDropReq->suid = pTableMeta->suid;
×
UNCOV
1610
    taosMemoryFreeClear(pTableMeta);
×
1611
    uDebug(LOG_ID_TAG " drop table name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, pDropReq->name, oldSuid,
×
1612
           pDropReq->suid);
1613

1614
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
×
1615
    SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
×
UNCOV
1616
    if (pTableBatch == NULL) {
×
UNCOV
1617
      SVgroupDropTableBatch tBatch = {0};
×
1618
      tBatch.info = pInfo;
×
1619
      tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
×
1620
      RAW_NULL_CHECK(tBatch.req.pArray);
×
1621
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pDropReq));
×
1622
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
×
1623
    } else {  // add to the correct vgroup
1624
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pDropReq));
×
1625
    }
1626
  }
1627

1628
  if (taosHashGetSize(pVgroupHashmap) == 0) {
×
UNCOV
1629
    goto end;
×
1630
  }
UNCOV
1631
  SArray* pBufArray = NULL;
×
1632
  RAW_RETURN_CHECK(serializeVgroupsDropTableBatch(pVgroupHashmap, &pBufArray));
×
1633
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
×
UNCOV
1634
  if (TSDB_CODE_SUCCESS != code) goto end;
×
1635
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
×
1636
  pQuery->msgType = TDMT_VND_DROP_TABLE;
×
1637
  pQuery->stableQuery = false;
×
1638
  pQuery->pRoot = NULL;
×
1639
  code = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT, &pQuery->pRoot);
×
1640
  if (TSDB_CODE_SUCCESS != code) goto end;
×
1641
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
×
1642

1643
  launchQueryImpl(pRequest, pQuery, true, NULL);
×
1644
  if (pRequest->code == TSDB_CODE_SUCCESS) {
×
1645
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
×
1646
  }
1647
  code = pRequest->code;
×
1648
  uDebug(LOG_ID_TAG " drop table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
×
1649

UNCOV
1650
end:
×
1651
  taosHashCleanup(pVgroupHashmap);
×
1652
  destroyRequest(pRequest);
×
UNCOV
1653
  tDecoderClear(&coder);
×
1654
  qDestroyQuery(pQuery);
×
1655
  RAW_LOG_END
×
1656
  return code;
×
1657
}
1658

1659
static int32_t taosDeleteData(TAOS* taos, void* meta, uint32_t metaLen) {
×
1660
  if (taos == NULL || meta == NULL) {
×
UNCOV
1661
    uError("invalid parameter in %s", __func__);
×
UNCOV
1662
    return TSDB_CODE_INVALID_PARA;
×
1663
  }
1664
  SDeleteRes req = {0};
×
1665
  SDecoder   coder = {0};
×
1666
  char       sql[256] = {0};
×
UNCOV
1667
  int32_t    code = TSDB_CODE_SUCCESS;
×
1668
  int32_t    lino = 0;
×
1669
  uDebug("connId:0x%" PRIx64 " delete data, meta:%p, len:%d", *(int64_t*)taos, meta, metaLen);
×
1670

1671
  // decode and process req
1672
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
×
1673
  uint32_t len = metaLen - sizeof(SMsgHead);
×
UNCOV
1674
  tDecoderInit(&coder, data, len);
×
UNCOV
1675
  RAW_RETURN_CHECK(tDecodeDeleteRes(&coder, &req));
×
1676
  (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
×
1677
                 req.tsColName, req.skey, req.tsColName, req.ekey);
1678

1679
  TAOS_RES* res = taosQueryImpl(taos, sql, false, TD_REQ_FROM_TAOX);
×
1680
  RAW_NULL_CHECK(res);
×
UNCOV
1681
  SRequestObj* pRequest = (SRequestObj*)res;
×
UNCOV
1682
  code = pRequest->code;
×
1683
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_PAR_GET_META_ERROR) {
×
1684
    code = TSDB_CODE_SUCCESS;
×
1685
  }
1686
  taos_free_result(res);
×
1687
  uDebug("connId:0x%" PRIx64 " delete data sql:%s, code:%s", *(int64_t*)taos, sql, tstrerror(code));
×
1688

UNCOV
1689
end:
×
1690
  RAW_LOG_END
×
1691
  tDecoderClear(&coder);
×
UNCOV
1692
  return code;
×
1693
}
1694

1695
static int32_t taosAlterTable(TAOS* taos, void* meta, uint32_t metaLen) {
8,103✔
1696
  if (taos == NULL || meta == NULL) {
8,103✔
UNCOV
1697
    uError("invalid parameter in %s", __func__);
×
UNCOV
1698
    return TSDB_CODE_INVALID_PARA;
×
1699
  }
1700
  SVAlterTbReq   req = {0};
8,103✔
1701
  SDecoder       dcoder = {0};
8,103✔
1702
  int32_t        code = TSDB_CODE_SUCCESS;
8,103✔
1703
  int32_t        lino = 0;
8,103✔
1704
  SRequestObj*   pRequest = NULL;
8,103✔
1705
  SQuery*        pQuery = NULL;
8,103✔
1706
  SArray*        pArray = NULL;
8,103✔
1707
  SVgDataBlocks* pVgData = NULL;
8,103✔
1708
  SArray*        pVgList = NULL;
8,103✔
1709
  SEncoder       coder = {0};
8,103✔
1710
  SHashObj*      pVgroupHashmap = NULL;
8,103✔
1711

1712
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
8,103✔
1713
  uDebug(LOG_ID_TAG " alter table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
8,103✔
1714
  pRequest->syncQuery = true;
8,103✔
1715
  if (!pRequest->pDb) {
8,103✔
UNCOV
1716
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
UNCOV
1717
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1718
    goto end;
×
1719
  }
1720
  // decode and process req
1721
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
8,103✔
1722
  uint32_t len = metaLen - sizeof(SMsgHead);
8,103✔
1723
  tDecoderInit(&dcoder, data, len);
8,103✔
1724
  RAW_RETURN_CHECK(tDecodeSVAlterTbReq(&dcoder, &req));
8,103✔
1725
  // do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
1726
  if (req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS) {
8,103✔
UNCOV
1727
    uInfo(LOG_ID_TAG " alter table action is UPDATE_OPTIONS, ignore", LOG_ID_VALUE);
×
UNCOV
1728
    goto end;
×
1729
  }
1730

1731
  STscObj*  pTscObj = pRequest->pTscObj;
8,103✔
1732
  SCatalog* pCatalog = NULL;
8,103✔
1733
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
8,103✔
1734
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
8,103✔
1735
                           .requestId = pRequest->requestId,
8,103✔
1736
                           .requestObjRefId = pRequest->self,
8,103✔
1737
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
8,103✔
1738

1739
  // Handle Type 1 batch modification with vnode grouping
1740
  if (req.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TABLE_TAG_VAL) {
8,103✔
1741
    if (req.tables == NULL || taosArrayGetSize(req.tables) == 0) {
2,587✔
UNCOV
1742
      uError(LOG_ID_TAG " Type 1 batch alter has empty tables array", LOG_ID_VALUE);
×
UNCOV
1743
      code = TSDB_CODE_INVALID_PARA;
×
UNCOV
1744
      goto end;
×
1745
    }
1746

1747
    int32_t nTables = taosArrayGetSize(req.tables);
2,587✔
1748
    uDebug(LOG_ID_TAG " Type 1 batch alter with %d tables, grouping by vnode", LOG_ID_VALUE, nTables);
2,587✔
1749

1750
    // Create hashmap to group tables by vgId
1751
    pVgroupHashmap = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
2,587✔
1752
    RAW_NULL_CHECK(pVgroupHashmap);
2,587✔
1753

1754
    // Group tables by vnode
1755
    for (int32_t i = 0; i < nTables; i++) {
5,914✔
1756
      SUpdateTableTagVal* pTable = taosArrayGet(req.tables, i);
3,327✔
1757
      if (pTable == NULL || pTable->tbName == NULL) {
3,327✔
UNCOV
1758
        uWarn(LOG_ID_TAG " Type 1 batch alter table[%d] has invalid name, skip", LOG_ID_VALUE, i);
×
UNCOV
1759
        continue;
×
1760
      }
1761

1762
      // Query vnode for this table
1763
      SVgroupInfo vgInfo = {0};
3,327✔
1764
      SName pName = {0};
3,327✔
1765
      toName(pTscObj->acctId, pRequest->pDb, pTable->tbName, &pName);
3,327✔
1766
      code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgInfo);
3,327✔
1767
      if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
3,327✔
UNCOV
1768
        uWarn(LOG_ID_TAG " Type 1 batch alter table %s not found, skip", LOG_ID_VALUE, pTable->tbName);
×
UNCOV
1769
        code = TSDB_CODE_SUCCESS;
×
UNCOV
1770
        continue;
×
1771
      }
1772
      RAW_RETURN_CHECK(code);
3,327✔
1773

1774
      // Add table to corresponding vnode's array
1775
      SArray** ppTables = taosHashGet(pVgroupHashmap, &vgInfo.vgId, sizeof(int32_t));
3,327✔
1776
      if (ppTables == NULL) {
3,327✔
1777
        SArray* pTables = taosArrayInit(16, sizeof(SUpdateTableTagVal));
2,957✔
1778
        RAW_NULL_CHECK(pTables);
2,957✔
1779
        RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &vgInfo.vgId, sizeof(int32_t), &pTables, sizeof(void*)));
2,957✔
1780
        ppTables = taosHashGet(pVgroupHashmap, &vgInfo.vgId, sizeof(int32_t));
2,957✔
1781
      }
1782

1783
      SArray* pTables = *ppTables;
3,327✔
1784
      RAW_NULL_CHECK(taosArrayPush(pTables, pTable));
3,327✔
1785
    }
1786

1787
    // Build and send separate request for each vnode
1788
    pArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*));
2,587✔
1789
    RAW_NULL_CHECK(pArray);
2,587✔
1790

1791
    void* pIter = taosHashIterate(pVgroupHashmap, NULL);
2,587✔
1792
    while (pIter) {
5,544✔
1793
      size_t keyLen = 0;
2,957✔
1794
      int32_t* pVgId = taosHashGetKey(pIter, &keyLen);
2,957✔
1795
      SArray* pTables = *(SArray**)pIter;
2,957✔
1796
      int32_t nTablesInVg = taosArrayGetSize(pTables);
2,957✔
1797

1798
      uDebug(LOG_ID_TAG " Type 1 batch alter: vgId:%d has %d tables", LOG_ID_VALUE, *pVgId, nTablesInVg);
2,957✔
1799

1800
      // Build SVAlterTbReq for this vnode
1801
      SVAlterTbReq vgReq = {0};
2,957✔
1802
      vgReq.action = req.action;
2,957✔
1803
      vgReq.tbName = req.tbName;
2,957✔
1804
      vgReq.source = TD_REQ_FROM_TAOX;
2,957✔
1805
      vgReq.tables = pTables;
2,957✔
1806

1807
      // Encode request
1808
      int tlen = 0;
2,957✔
1809
      tEncodeSize(tEncodeSVAlterTbReq, &vgReq, tlen, code);
2,957✔
1810
      if (code < 0) {
2,957✔
UNCOV
1811
        uError(LOG_ID_TAG " Type 1 batch alter encode failed for vgId:%d, code:%s",
×
1812
               LOG_ID_VALUE, *pVgId, tstrerror(code));
UNCOV
1813
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
UNCOV
1814
        goto end;
×
1815
      }
1816

1817
      tlen += sizeof(SMsgHead);
2,957✔
1818
      void* pMsg = taosMemoryMalloc(tlen);
2,957✔
1819
      if (pMsg == NULL) {
2,957✔
UNCOV
1820
        code = terrno;
×
UNCOV
1821
        uError(LOG_ID_TAG " Type 1 batch alter malloc failed for vgId:%d, size:%d",
×
1822
               LOG_ID_VALUE, *pVgId, tlen);
UNCOV
1823
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
UNCOV
1824
        goto end;
×
1825
      }
1826

1827
      ((SMsgHead*)pMsg)->vgId = htonl(*pVgId);
2,957✔
1828
      ((SMsgHead*)pMsg)->contLen = htonl(tlen);
2,957✔
1829
      void* pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
2,957✔
1830

1831
      SEncoder vgCoder = {0};
2,957✔
1832
      tEncoderInit(&vgCoder, pBuf, tlen - sizeof(SMsgHead));
2,957✔
1833
      code = tEncodeSVAlterTbReq(&vgCoder, &vgReq);
2,957✔
1834
      tEncoderClear(&vgCoder);
2,957✔
1835

1836
      if (code < 0) {
2,957✔
UNCOV
1837
        uError(LOG_ID_TAG " Type 1 batch alter encode2 failed for vgId:%d, code:%s",
×
1838
               LOG_ID_VALUE, *pVgId, tstrerror(code));
UNCOV
1839
        taosMemoryFree(pMsg);
×
UNCOV
1840
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
UNCOV
1841
        goto end;
×
1842
      }
1843

1844
      // Create VgDataBlocks for this vnode
1845
      pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
2,957✔
1846
      if (pVgData == NULL) {
2,957✔
UNCOV
1847
        code = terrno;
×
UNCOV
1848
        taosMemoryFree(pMsg);
×
UNCOV
1849
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
UNCOV
1850
        goto end;
×
1851
      }
1852

1853
      // Query vgroup info for first table to get endpoint
1854
      SUpdateTableTagVal* pFirstTable = taosArrayGet(pTables, 0);
2,957✔
1855
      SVgroupInfo vgInfo = {0};
2,957✔
1856
      SName pName = {0};
2,957✔
1857
      toName(pTscObj->acctId, pRequest->pDb, pFirstTable->tbName, &pName);
2,957✔
1858
      code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgInfo);
2,957✔
1859
      if (code != TSDB_CODE_SUCCESS) {
2,957✔
UNCOV
1860
        taosMemoryFree(pMsg);
×
UNCOV
1861
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
UNCOV
1862
        goto end;
×
1863
      }
1864

1865
      pVgData->vg = vgInfo;
2,957✔
1866
      pVgData->pData = pMsg;
2,957✔
1867
      pVgData->size = tlen;
2,957✔
1868
      pVgData->numOfTables = nTablesInVg;
2,957✔
1869

1870
      if (taosArrayPush(pArray, &pVgData) == NULL) {
2,957✔
UNCOV
1871
        code = terrno;
×
UNCOV
1872
        taosMemoryFree(pMsg);
×
UNCOV
1873
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
UNCOV
1874
        goto end;
×
1875
      }
1876

1877
      pVgData = NULL;  // Ownership transferred to pArray
2,957✔
1878
      pIter = taosHashIterate(pVgroupHashmap, pIter);
2,957✔
1879
    }
1880

1881
    uInfo(LOG_ID_TAG " Type 1 batch alter: grouped %d tables into %d vnodes",
2,587✔
1882
          LOG_ID_VALUE, nTables, (int32_t)taosArrayGetSize(pArray));
1883
  } else if (req.action == TSDB_ALTER_TABLE_UPDATE_CHILD_TABLE_TAG_VAL) {
5,516✔
1884
    char dbFName[TSDB_DB_FNAME_LEN] = {0};
740✔
1885
    SName pName = {TSDB_TABLE_NAME_T, pTscObj->acctId, {0}, {0}};
740✔
1886
    tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
740✔
1887
    (void)tNameGetFullDbName(&pName, dbFName);
740✔
1888
    RAW_RETURN_CHECK(catalogGetDBVgList(pCatalog, &conn, dbFName, &pVgList));
740✔
1889

1890
    pArray = taosArrayInit(taosArrayGetSize(pVgList), sizeof(void*));
740✔
1891
    RAW_NULL_CHECK(pArray);
740✔
1892
    for (int i = 0; i < taosArrayGetSize(pVgList); ++i) {
2,220✔
1893
      SVgroupInfo* pInfo = (SVgroupInfo*)taosArrayGet(pVgList, i);
1,480✔
1894
      pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
1,480✔
1895
      RAW_NULL_CHECK(pVgData);
1,480✔
1896
      pVgData->vg = *pInfo;
1,480✔
1897

1898
      int tlen = 0;
1,480✔
1899
      req.source = TD_REQ_FROM_TAOX;
1,480✔
1900

1901
      tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code);
1,480✔
1902
      RAW_RETURN_CHECK(code);
1,480✔
1903
      tlen += sizeof(SMsgHead);
1,480✔
1904
      void* pMsg = taosMemoryMalloc(tlen);
1,480✔
1905
      RAW_NULL_CHECK(pMsg);
1,480✔
1906
      ((SMsgHead*)pMsg)->vgId = htonl(pInfo->vgId);
1,480✔
1907
      ((SMsgHead*)pMsg)->contLen = htonl(tlen);
1,480✔
1908
      void* pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
1,480✔
1909
      tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
1,480✔
1910
      RAW_RETURN_CHECK(tEncodeSVAlterTbReq(&coder, &req));
1,480✔
1911
      tEncoderClear(&coder);
1,480✔
1912

1913
      pVgData->pData = pMsg;
1,480✔
1914
      pVgData->size = tlen;
1,480✔
1915

1916
      pVgData->numOfTables = 1;
1,480✔
1917
      RAW_NULL_CHECK(taosArrayPush(pArray, &pVgData));
1,480✔
1918
      pVgData = NULL;  // Ownership transferred to pArray
1,480✔
1919
    }
1920
  } else {
1921
    // Single table or Type 2 modification - original logic
1922
    SVgroupInfo pInfo = {0};
4,776✔
1923
    SName       pName = {0};
4,776✔
1924
    toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
4,776✔
1925
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
4,776✔
1926
    pArray = taosArrayInit(1, sizeof(void*));
4,776✔
1927
    RAW_NULL_CHECK(pArray);
4,776✔
1928

1929
    pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
4,776✔
1930
    RAW_NULL_CHECK(pVgData);
4,776✔
1931
    pVgData->vg = pInfo;
4,776✔
1932

1933
    int tlen = 0;
4,776✔
1934
    req.source = TD_REQ_FROM_TAOX;
4,776✔
1935

1936
    if (strlen(tmqWriteRefDB) > 0) {
4,776✔
1937
      req.refDbName = tmqWriteRefDB;
4,776✔
1938
    }
1939

1940
    if (req.action == TSDB_ALTER_TABLE_ALTER_COLUMN_REF && tmqWriteCheckRef) {
4,776✔
1941
      RAW_RETURN_CHECK(checkColRefForAlter(pCatalog, &conn, pTscObj->acctId, req.refDbName, req.refTbName, req.refColName,
318✔
1942
        pRequest->pDb, req.tbName, req.colName));
1943
    }else if (req.action == TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COLUMN_REF && tmqWriteCheckRef) {
4,458✔
UNCOV
1944
      RAW_RETURN_CHECK(checkColRefForAdd(pCatalog, &conn, pTscObj->acctId, req.refDbName, req.refTbName, req.refColName,
×
1945
        pRequest->pDb, req.tbName, req.colName, req.type, req.bytes));
1946
    }
1947

1948
    tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code);
4,776✔
1949
    RAW_RETURN_CHECK(code);
4,776✔
1950
    tlen += sizeof(SMsgHead);
4,776✔
1951
    void* pMsg = taosMemoryMalloc(tlen);
4,776✔
1952
    RAW_NULL_CHECK(pMsg);
4,776✔
1953
    ((SMsgHead*)pMsg)->vgId = htonl(pInfo.vgId);
4,776✔
1954
    ((SMsgHead*)pMsg)->contLen = htonl(tlen);
4,776✔
1955
    void* pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
4,776✔
1956
    tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
4,776✔
1957
    RAW_RETURN_CHECK(tEncodeSVAlterTbReq(&coder, &req));
4,776✔
1958

1959
    pVgData->pData = pMsg;
4,776✔
1960
    pVgData->size = tlen;
4,776✔
1961

1962
    pVgData->numOfTables = 1;
4,776✔
1963
    RAW_NULL_CHECK(taosArrayPush(pArray, &pVgData));
4,776✔
1964
    pVgData = NULL;
4,776✔
1965
  }
1966

1967
  pQuery = NULL;
8,103✔
1968
  RAW_RETURN_CHECK(nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery));
8,103✔
1969
  if (NULL == pQuery) goto end;
8,103✔
1970
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
8,103✔
1971
  pQuery->msgType = TDMT_VND_ALTER_TABLE;
8,103✔
1972
  pQuery->stableQuery = false;
8,103✔
1973
  pQuery->pRoot = NULL;
8,103✔
1974
  RAW_RETURN_CHECK(nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT, &pQuery->pRoot));
8,103✔
1975
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pArray));
8,103✔
1976

1977
  launchQueryImpl(pRequest, pQuery, true, NULL);
8,103✔
1978
  pArray = NULL;
8,103✔
1979

1980
  code = pRequest->code;
8,103✔
1981
  if (code == TSDB_CODE_TDB_TABLE_NOT_EXIST || code == TSDB_CODE_NOT_FOUND) {
8,103✔
UNCOV
1982
    code = TSDB_CODE_SUCCESS;
×
1983
  }
1984

1985
  if (pRequest->code == TSDB_CODE_SUCCESS) {
8,103✔
1986
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
8,103✔
1987
    if (pRes->res != NULL) {
8,103✔
1988
      code = handleAlterTbExecRes(pRes->res, pCatalog);
4,776✔
1989
    }
1990
  }
1991
  uDebug(LOG_ID_TAG " alter table return, meta:%p, len:%d, msg:%s", LOG_ID_VALUE, meta, metaLen, tstrerror(code));
8,103✔
1992

1993
end:
8,103✔
1994
  // Cleanup vnode grouping hashmap
1995
  if (pVgroupHashmap != NULL) {
8,103✔
1996
    void* pIter = taosHashIterate(pVgroupHashmap, NULL);
2,587✔
1997
    while (pIter) {
5,544✔
1998
      SArray* pTables = *(SArray**)pIter;
2,957✔
1999
      taosArrayDestroy(pTables);
2,957✔
2000
      pIter = taosHashIterate(pVgroupHashmap, pIter);
2,957✔
2001
    }
2002
    taosHashCleanup(pVgroupHashmap);
2,587✔
2003
  }
2004

2005
  for (int i = 0; i < taosArrayGetSize(pArray); ++i) {
8,103✔
UNCOV
2006
    SVgDataBlocks* pData = (SVgDataBlocks*)taosArrayGetP(pArray, i);
×
UNCOV
2007
    if (pData && pData->pData) {
×
UNCOV
2008
      taosMemoryFreeClear(pData->pData);
×
UNCOV
2009
      taosMemoryFreeClear(pData);
×
2010
    }
2011
  }
2012
  taosArrayDestroy(pArray);
8,103✔
2013
  
2014
  if (pVgData) {
8,103✔
UNCOV
2015
    taosMemoryFreeClear(pVgData->pData);
×
UNCOV
2016
    taosMemoryFreeClear(pVgData);
×
2017
  }
2018
  taosArrayDestroy(pVgList);
8,103✔
2019
  destroyRequest(pRequest);
8,103✔
2020
  tDecoderClear(&dcoder);
8,103✔
2021
  qDestroyQuery(pQuery);
8,103✔
2022
  destroyAlterTbReq(&req);
8,103✔
2023
  tEncoderClear(&coder);
8,103✔
2024
  RAW_LOG_END
8,103✔
2025
  return code;
8,103✔
2026
}
2027

2028
int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD* fields,
299✔
2029
                                     int numFields) {
2030
  return taos_write_raw_block_with_fields_with_reqid(taos, rows, pData, tbname, fields, numFields, 0);
299✔
2031
}
2032

2033
int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname,
299✔
2034
                                                TAOS_FIELD* fields, int numFields, int64_t reqid) {
2035
  if (taos == NULL || pData == NULL || tbname == NULL) {
299✔
UNCOV
2036
    uError("invalid parameter in %s", __func__);
×
UNCOV
2037
    return TSDB_CODE_INVALID_PARA;
×
2038
  }
2039
  int32_t     code = TSDB_CODE_SUCCESS;
299✔
2040
  int32_t     lino = 0;
299✔
2041
  STableMeta* pTableMeta = NULL;
299✔
2042
  SQuery*     pQuery = NULL;
299✔
2043
  SHashObj*   pVgHash = NULL;
299✔
2044

2045
  SRequestObj* pRequest = NULL;
299✔
2046
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
299✔
2047

2048
  uDebug(LOG_ID_TAG " write raw block with field, rows:%d, pData:%p, tbname:%s, fields:%p, numFields:%d", LOG_ID_VALUE,
299✔
2049
         rows, pData, tbname, fields, numFields);
2050

2051
  pRequest->syncQuery = true;
299✔
2052
  if (!pRequest->pDb) {
299✔
2053
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
2054
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
2055
    goto end;
×
2056
  }
2057

2058
  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
299✔
2059
  tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
299✔
2060
  tstrncpy(pName.tname, tbname, sizeof(pName.tname));
299✔
2061

2062
  struct SCatalog* pCatalog = NULL;
299✔
2063
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
299✔
2064

2065
  SRequestConnInfo conn = {0};
299✔
2066
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
299✔
2067
  conn.requestId = pRequest->requestId;
299✔
2068
  conn.requestObjRefId = pRequest->self;
299✔
2069
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
299✔
2070

2071
  SVgroupInfo vgData = {0};
299✔
2072
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData));
299✔
2073
  RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
299✔
2074
  RAW_RETURN_CHECK(smlInitHandle(&pQuery));
299✔
2075
  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
299✔
2076
  RAW_NULL_CHECK(pVgHash);
299✔
2077
  RAW_RETURN_CHECK(
299✔
2078
      taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
2079
  RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0, false));
299✔
2080
  RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
299✔
2081

2082
  launchQueryImpl(pRequest, pQuery, true, NULL);
299✔
2083
  code = pRequest->code;
299✔
2084
  uDebug(LOG_ID_TAG " write raw block with field return, msg:%s", LOG_ID_VALUE, tstrerror(code));
299✔
2085

2086
end:
299✔
2087
  taosMemoryFreeClear(pTableMeta);
299✔
2088
  qDestroyQuery(pQuery);
299✔
2089
  destroyRequest(pRequest);
299✔
2090
  taosHashCleanup(pVgHash);
299✔
2091
  RAW_LOG_END
299✔
2092
  return code;
299✔
2093
}
2094

2095
int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) {
2,093✔
2096
  return taos_write_raw_block_with_reqid(taos, rows, pData, tbname, 0);
2,093✔
2097
}
2098

2099
int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname, int64_t reqid) {
2,093✔
2100
  if (taos == NULL || pData == NULL || tbname == NULL) {
2,093✔
UNCOV
2101
    return TSDB_CODE_INVALID_PARA;
×
2102
  }
2103
  int32_t     code = TSDB_CODE_SUCCESS;
2,093✔
2104
  int32_t     lino = 0;
2,093✔
2105
  STableMeta* pTableMeta = NULL;
2,093✔
2106
  SQuery*     pQuery = NULL;
2,093✔
2107
  SHashObj*   pVgHash = NULL;
2,093✔
2108

2109
  SRequestObj* pRequest = NULL;
2,093✔
2110
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
2,093✔
2111

2112
  uDebug(LOG_ID_TAG " write raw block, rows:%d, pData:%p, tbname:%s", LOG_ID_VALUE, rows, pData, tbname);
2,093✔
2113

2114
  pRequest->syncQuery = true;
2,093✔
2115
  if (!pRequest->pDb) {
2,093✔
UNCOV
2116
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
UNCOV
2117
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
2118
    goto end;
×
2119
  }
2120

2121
  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
2,093✔
2122
  tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
2,093✔
2123
  tstrncpy(pName.tname, tbname, sizeof(pName.tname));
2,093✔
2124

2125
  struct SCatalog* pCatalog = NULL;
2,093✔
2126
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
2,093✔
2127

2128
  SRequestConnInfo conn = {0};
2,093✔
2129
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
2,093✔
2130
  conn.requestId = pRequest->requestId;
2,093✔
2131
  conn.requestObjRefId = pRequest->self;
2,093✔
2132
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
2,093✔
2133

2134
  SVgroupInfo vgData = {0};
2,093✔
2135
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData));
2,093✔
2136
  RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
2,093✔
2137
  RAW_RETURN_CHECK(smlInitHandle(&pQuery));
1,794✔
2138
  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
1,794✔
2139
  RAW_NULL_CHECK(pVgHash);
1,794✔
2140
  RAW_RETURN_CHECK(
1,794✔
2141
      taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
2142
  RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false, NULL, 0, false));
1,794✔
2143
  RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
1,196✔
2144

2145
  launchQueryImpl(pRequest, pQuery, true, NULL);
1,196✔
2146
  code = pRequest->code;
1,196✔
2147
  uDebug(LOG_ID_TAG " write raw block return, msg:%s", LOG_ID_VALUE, tstrerror(code));
1,196✔
2148

2149
end:
2,093✔
2150
  taosMemoryFreeClear(pTableMeta);
2,093✔
2151
  qDestroyQuery(pQuery);
2,093✔
2152
  destroyRequest(pRequest);
2,093✔
2153
  taosHashCleanup(pVgHash);
2,093✔
2154
  RAW_LOG_END
2,093✔
2155
  return code;
2,093✔
2156
}
2157

2158
static void* getRawDataFromRes(void* pRetrieve) {
9,160✔
2159
  if (pRetrieve == NULL) {
9,160✔
2160
    uError("invalid parameter in %s", __func__);
×
2161
    return NULL;
×
2162
  }
2163
  void* rawData = NULL;
9,160✔
2164
  // deal with compatibility
2165
  if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_VERSION) {
9,160✔
UNCOV
2166
    rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
×
2167
  } else if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_VERSION ||
9,160✔
UNCOV
2168
             *(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION) {
×
2169
    rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
9,160✔
2170
  }
2171
  return rawData;
9,160✔
2172
}
2173

2174
static int32_t buildCreateTbMap(SMqDataRsp* rsp, SHashObj* pHashObj) {
690✔
2175
  if (rsp == NULL || pHashObj == NULL) {
690✔
UNCOV
2176
    uError("invalid parameter in %s", __func__);
×
UNCOV
2177
    return TSDB_CODE_INVALID_PARA;
×
2178
  }
2179
  // find schema data info
2180
  int32_t       code = 0;
690✔
2181
  int32_t       lino = 0;
690✔
2182
  SVCreateTbReq pCreateReq = {0};
690✔
2183
  SDecoder      decoderTmp = {0};
690✔
2184
  RAW_LOG_START
690✔
2185
  for (int j = 0; j < rsp->createTableNum; j++) {
2,070✔
2186
    void** dataTmp = taosArrayGet(rsp->createTableReq, j);
1,380✔
2187
    RAW_NULL_CHECK(dataTmp);
1,380✔
2188
    int32_t* lenTmp = taosArrayGet(rsp->createTableLen, j);
1,380✔
2189
    RAW_NULL_CHECK(lenTmp);
1,380✔
2190

2191
    tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
1,380✔
2192
    RAW_RETURN_CHECK(tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq));
1,380✔
2193

2194
    if (pCreateReq.type != TSDB_CHILD_TABLE) {
1,380✔
2195
      uError("invalid table type %d in %s", pCreateReq.type, __func__);
×
UNCOV
2196
      code = TSDB_CODE_INVALID_MSG;
×
UNCOV
2197
      goto end;
×
2198
    }
2199
    if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL) {
1,380✔
2200
      RAW_RETURN_CHECK(
1,380✔
2201
          taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq)));
2202
    } else {
UNCOV
2203
      tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
×
2204
    }
2205

2206
    tDecoderClear(&decoderTmp);
1,380✔
2207
    pCreateReq = (SVCreateTbReq){0};
1,380✔
2208
  }
2209

2210
end:
690✔
2211
  tDecoderClear(&decoderTmp);
690✔
2212
  tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
690✔
2213
  RAW_LOG_END
690✔
2214
  return code;
690✔
2215
}
2216

2217
typedef enum {
2218
  WRITE_RAW_INIT_START = 0,
2219
  WRITE_RAW_INIT_OK,
2220
  WRITE_RAW_INIT_FAIL,
2221
} WRITE_RAW_INIT_STATUS;
2222

2223
static SHashObj* writeRawCache = NULL;
2224
static int8_t    initFlag = 0;
2225
static int8_t    initedFlag = WRITE_RAW_INIT_START;
2226

2227
typedef struct {
2228
  SHashObj* pVgHash;
2229
  SHashObj* pNameHash;
2230
  SHashObj* pMetaHash;
2231
} rawCacheInfo;
2232

2233
typedef struct {
2234
  SVgroupInfo vgInfo;
2235
  int64_t     uid;
2236
  int64_t     suid;
2237
} tbInfo;
2238

2239
static void tmqFreeMeta(void* data) {
1,004✔
2240
  if (data == NULL) {
1,004✔
UNCOV
2241
    uError("invalid parameter in %s", __func__);
×
UNCOV
2242
    return;
×
2243
  }
2244
  STableMeta* pTableMeta = *(STableMeta**)data;
1,004✔
2245
  taosMemoryFree(pTableMeta);
1,004✔
2246
}
2247

2248
static void freeRawCache(void* data) {
×
2249
  if (data == NULL) {
×
2250
    uError("invalid parameter in %s", __func__);
×
UNCOV
2251
    return;
×
2252
  }
2253
  rawCacheInfo* pRawCache = (rawCacheInfo*)data;
×
2254
  taosHashCleanup(pRawCache->pMetaHash);
×
2255
  taosHashCleanup(pRawCache->pNameHash);
×
UNCOV
2256
  taosHashCleanup(pRawCache->pVgHash);
×
2257
}
2258

2259
static int32_t initRawCacheHash() {
3,100✔
2260
  if (writeRawCache == NULL) {
3,100✔
2261
    writeRawCache = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
3,100✔
2262
    if (writeRawCache == NULL) {
3,100✔
UNCOV
2263
      return terrno;
×
2264
    }
2265
    taosHashSetFreeFp(writeRawCache, freeRawCache);
3,100✔
2266
  }
2267
  return 0;
3,100✔
2268
}
2269

2270
static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW) {
×
UNCOV
2271
  if (rawData == NULL || pSW == NULL) {
×
UNCOV
2272
    return false;
×
2273
  }
UNCOV
2274
  if (pTableMeta == NULL) {
×
UNCOV
2275
    uError("invalid parameter in %s", __func__);
×
UNCOV
2276
    return false;
×
2277
  }
UNCOV
2278
  char* p = (char*)rawData;
×
2279
  // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
2280
  // column length |
UNCOV
2281
  p += sizeof(int32_t);
×
UNCOV
2282
  p += sizeof(int32_t);
×
UNCOV
2283
  p += sizeof(int32_t);
×
UNCOV
2284
  p += sizeof(int32_t);
×
UNCOV
2285
  p += sizeof(int32_t);
×
UNCOV
2286
  p += sizeof(uint64_t);
×
UNCOV
2287
  int8_t* fields = p;
×
2288

UNCOV
2289
  if (pSW->nCols != pTableMeta->tableInfo.numOfColumns) {
×
UNCOV
2290
    return true;
×
2291
  }
2292

UNCOV
2293
  for (int i = 0; i < pSW->nCols; i++) {
×
UNCOV
2294
    int j = 0;
×
UNCOV
2295
    for (; j < pTableMeta->tableInfo.numOfColumns; j++) {
×
UNCOV
2296
      SSchema*    pColSchema = &pTableMeta->schema[j];
×
UNCOV
2297
      SSchemaExt* pColExtSchema = &pTableMeta->schemaExt[j];
×
UNCOV
2298
      char*       fieldName = pSW->pSchema[i].name;
×
2299

UNCOV
2300
      if (strcmp(pColSchema->name, fieldName) == 0) {
×
UNCOV
2301
        if (checkSchema(pColSchema, pColExtSchema, fields, NULL, 0) != 0) {
×
UNCOV
2302
          return true;
×
2303
        }
UNCOV
2304
        break;
×
2305
      }
2306
    }
UNCOV
2307
    fields += sizeof(int8_t) + sizeof(int32_t);
×
2308

UNCOV
2309
    if (j == pTableMeta->tableInfo.numOfColumns) return true;
×
2310
  }
UNCOV
2311
  return false;
×
2312
}
2313

2314
static int32_t getRawCache(SHashObj** pVgHash, SHashObj** pNameHash, SHashObj** pMetaHash, void* key) {
6,838✔
2315
  if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || key == NULL) {
6,838✔
UNCOV
2316
    uError("invalid parameter in %s", __func__);
×
UNCOV
2317
    return TSDB_CODE_INVALID_PARA;
×
2318
  }
2319
  int32_t code = 0;
6,838✔
2320
  int32_t lino = 0;
6,838✔
2321
  RAW_LOG_START
6,838✔
2322
  void* cacheInfo = taosHashGet(writeRawCache, &key, POINTER_BYTES);
6,838✔
2323
  if (cacheInfo == NULL) {
6,838✔
2324
    *pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
6,838✔
2325
    RAW_NULL_CHECK(*pVgHash);
6,838✔
2326
    *pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
6,838✔
2327
    RAW_NULL_CHECK(*pNameHash);
6,838✔
2328
    *pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
6,838✔
2329
    RAW_NULL_CHECK(*pMetaHash);
6,838✔
2330
    taosHashSetFreeFp(*pMetaHash, tmqFreeMeta);
6,838✔
2331
    rawCacheInfo info = {*pVgHash, *pNameHash, *pMetaHash};
6,838✔
2332
    RAW_RETURN_CHECK(taosHashPut(writeRawCache, &key, POINTER_BYTES, &info, sizeof(rawCacheInfo)));
6,838✔
2333
  } else {
UNCOV
2334
    rawCacheInfo* info = (rawCacheInfo*)cacheInfo;
×
UNCOV
2335
    *pVgHash = info->pVgHash;
×
UNCOV
2336
    *pNameHash = info->pNameHash;
×
UNCOV
2337
    *pMetaHash = info->pMetaHash;
×
2338
  }
2339

2340
end:
6,838✔
2341
  if (code != 0) {
6,838✔
UNCOV
2342
    taosHashCleanup(*pMetaHash);
×
UNCOV
2343
    taosHashCleanup(*pNameHash);
×
2344
    taosHashCleanup(*pVgHash);
×
2345
  }
2346
  RAW_LOG_END
6,838✔
2347
  return code;
6,838✔
2348
}
2349

2350
static int32_t buildRawRequest(TAOS* taos, SRequestObj** pRequest, SCatalog** pCatalog, SRequestConnInfo* conn) {
6,838✔
2351
  if (taos == NULL || pRequest == NULL || pCatalog == NULL || conn == NULL) {
6,838✔
UNCOV
2352
    uError("invalid parameter in %s", __func__);
×
UNCOV
2353
    return TSDB_CODE_INVALID_PARA;
×
2354
  }
2355
  int32_t code = 0;
6,838✔
2356
  int32_t lino = 0;
6,838✔
2357
  RAW_LOG_START
6,838✔
2358
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, pRequest, 0));
6,838✔
2359
  (*pRequest)->syncQuery = true;
6,838✔
2360
  if (!(*pRequest)->pDb) {
6,838✔
UNCOV
2361
    uError("%s no database selected", __func__);
×
UNCOV
2362
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
2363
    goto end;
×
2364
  }
2365

2366
  RAW_RETURN_CHECK(catalogGetHandle((*pRequest)->pTscObj->pAppInfo->clusterId, pCatalog));
6,838✔
2367
  conn->pTrans = (*pRequest)->pTscObj->pAppInfo->pTransporter;
6,838✔
2368
  conn->requestId = (*pRequest)->requestId;
6,838✔
2369
  conn->requestObjRefId = (*pRequest)->self;
6,838✔
2370
  conn->mgmtEps = getEpSet_s(&(*pRequest)->pTscObj->pAppInfo->mgmtEp);
6,838✔
2371

2372
end:
6,838✔
2373
  RAW_LOG_END
6,838✔
2374
  return code;
6,838✔
2375
}
2376

2377
typedef int32_t _raw_decode_func_(SDecoder* pDecoder, SMqDataRsp* pRsp);
2378
static int32_t  decodeRawData(SDecoder* decoder, void* data, uint32_t dataLen, _raw_decode_func_ func,
6,838✔
2379
                              SMqRspObj* rspObj) {
2380
  if (decoder == NULL || data == NULL || func == NULL || rspObj == NULL) {
6,838✔
UNCOV
2381
    uError("invalid parameter in %s", __func__);
×
UNCOV
2382
    return TSDB_CODE_INVALID_PARA;
×
2383
  }
2384
  int8_t dataVersion = *(int8_t*)data;
6,838✔
2385
  if (dataVersion >= MQ_DATA_RSP_VERSION) {
6,838✔
2386
    data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
6,838✔
2387
    if (dataLen < sizeof(int8_t) + sizeof(int32_t)) {
6,838✔
UNCOV
2388
      return TSDB_CODE_INVALID_PARA;
×
2389
    }
2390
    dataLen -= sizeof(int8_t) + sizeof(int32_t);
6,838✔
2391
  }
2392

2393
  rspObj->resIter = -1;
6,838✔
2394
  tDecoderInit(decoder, data, dataLen);
6,838✔
2395
  int32_t code = func(decoder, &rspObj->dataRsp);
6,838✔
2396
  if (code != 0) {
6,838✔
UNCOV
2397
    SET_ERROR_MSG("decode mq taosx data rsp failed");
×
2398
  }
2399
  return code;
6,838✔
2400
}
2401

2402
static int32_t processCacheMeta(SHashObj* pVgHash, SHashObj* pNameHash, SHashObj* pMetaHash,
9,160✔
2403
                                SVCreateTbReq* pCreateReqDst, SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName,
2404
                                STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry) {
2405
  if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || pCatalog == NULL || conn == NULL || pName == NULL ||
9,160✔
2406
      pMeta == NULL) {
2407
    uError("invalid parameter in %s", __func__);
×
2408
    return TSDB_CODE_INVALID_PARA;
×
2409
  }
2410
  int32_t code = 0;
9,160✔
2411
  int32_t lino = 0;
9,160✔
2412
  RAW_LOG_START
9,160✔
2413
  STableMeta* pTableMeta = NULL;
9,160✔
2414
  tbInfo*     tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
9,160✔
2415
  if (tmpInfo == NULL || retry > 0) {
9,160✔
2416
    tbInfo info = {0};
9,160✔
2417

2418
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, conn, pName, &info.vgInfo));
9,160✔
2419
    if (pCreateReqDst && tmpInfo == NULL) {  // change stable name to get meta
9,160✔
2420
      tstrncpy(pName->tname, pCreateReqDst->ctb.stbName, TSDB_TABLE_NAME_LEN);
1,380✔
2421
    }
2422
    RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
9,160✔
2423
    info.uid = pTableMeta->uid;
9,160✔
2424
    if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
9,160✔
2425
      info.suid = pTableMeta->suid;
5,772✔
2426
    } else {
2427
      info.suid = pTableMeta->uid;
3,388✔
2428
    }
2429
    code = taosHashPut(pMetaHash, &info.suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
9,160✔
2430
    RAW_RETURN_CHECK(code);
9,160✔
2431

2432
    uDebug("put table meta to hash1, suid:%" PRId64 ", metaHashSIze:%d, nameHashSize:%d, vgHashSize:%d", info.suid,
9,160✔
2433
           taosHashGetSize(pMetaHash), taosHashGetSize(pNameHash), taosHashGetSize(pVgHash));
2434
    if (pCreateReqDst) {
9,160✔
2435
      pTableMeta->vgId = info.vgInfo.vgId;
1,380✔
2436
      pTableMeta->uid = pCreateReqDst->uid;
1,380✔
2437
      pCreateReqDst->ctb.suid = pTableMeta->suid;
1,380✔
2438
    }
2439

2440
    RAW_RETURN_CHECK(taosHashPut(pNameHash, pName->tname, strlen(pName->tname), &info, sizeof(tbInfo)));
9,160✔
2441
    tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
9,160✔
2442
    RAW_RETURN_CHECK(
9,160✔
2443
        taosHashPut(pVgHash, &info.vgInfo.vgId, sizeof(info.vgInfo.vgId), &info.vgInfo, sizeof(SVgroupInfo)));
2444
  }
2445

2446
  if (pTableMeta == NULL || retry > 0) {
9,160✔
UNCOV
2447
    STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, &tmpInfo->suid, LONG_BYTES);
×
2448
    if (pTableMetaTmp == NULL || retry > 0 || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) {
×
2449
      RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
×
2450
      code = taosHashPut(pMetaHash, &tmpInfo->suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
×
UNCOV
2451
      RAW_RETURN_CHECK(code);
×
2452
      uDebug("put table meta to hash2, suid:%" PRId64 ", metaHashSIze:%d, nameHashSize:%d, vgHashSize:%d",
×
2453
             tmpInfo->suid, taosHashGetSize(pMetaHash), taosHashGetSize(pNameHash), taosHashGetSize(pVgHash));
2454
    } else {
2455
      pTableMeta = *pTableMetaTmp;
×
2456
      pTableMeta->uid = tmpInfo->uid;
×
2457
      pTableMeta->vgId = tmpInfo->vgInfo.vgId;
×
2458
    }
2459
  }
2460
  *pMeta = pTableMeta;
9,160✔
2461
  pTableMeta = NULL;
9,160✔
2462

2463
end:
9,160✔
2464
  taosMemoryFree(pTableMeta);
9,160✔
2465
  RAW_LOG_END
9,160✔
2466
  return code;
9,160✔
2467
}
2468

2469
static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
6,148✔
2470
  if (taos == NULL || data == NULL) {
6,148✔
2471
    uError("invalid parameter in %s", __func__);
×
2472
    return TSDB_CODE_INVALID_PARA;
×
2473
  }
2474
  int32_t   code = TSDB_CODE_SUCCESS;
6,148✔
2475
  int32_t   lino = 0;
6,148✔
2476
  SQuery*   pQuery = NULL;
6,148✔
2477
  SMqRspObj rspObj = {0};
6,148✔
2478
  SDecoder  decoder = {0};
6,148✔
2479

2480
  SRequestObj*     pRequest = NULL;
6,148✔
2481
  SCatalog*        pCatalog = NULL;
6,148✔
2482
  SRequestConnInfo conn = {0};
6,148✔
2483
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
6,148✔
2484
  uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
6,148✔
2485
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
6,148✔
2486

2487
  SHashObj* pVgHash = NULL;
6,148✔
2488
  SHashObj* pNameHash = NULL;
6,148✔
2489
  SHashObj* pMetaHash = NULL;
6,148✔
2490
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
6,148✔
2491
  int retry = 0;
6,148✔
2492
  while (1) {
2493
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
6,148✔
2494
    uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
6,148✔
2495
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
13,928✔
2496
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
7,780✔
2497
      RAW_NULL_CHECK(tbName);
7,780✔
2498
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
7,780✔
2499
      RAW_NULL_CHECK(pSW);
7,780✔
2500
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
7,780✔
2501
      RAW_NULL_CHECK(pRetrieve);
7,780✔
2502
      void* rawData = getRawDataFromRes(pRetrieve);
7,780✔
2503
      RAW_NULL_CHECK(rawData);
7,780✔
2504

2505
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
7,780✔
2506
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
7,780✔
2507
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
7,780✔
2508
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
7,780✔
2509

2510
      STableMeta* pTableMeta = NULL;
7,780✔
2511
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, &pTableMeta, pSW,
7,780✔
2512
                                        rawData, retry));
2513
      char err[ERR_MSG_LEN] = {0};
7,780✔
2514
      code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
7,780✔
2515
      if (code != TSDB_CODE_SUCCESS) {
7,780✔
UNCOV
2516
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
UNCOV
2517
        goto end;
×
2518
      }
2519
    }
2520
    RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
6,148✔
2521
    launchQueryImpl(pRequest, pQuery, true, NULL);
6,148✔
2522
    code = pRequest->code;
6,148✔
2523

2524
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
6,148✔
UNCOV
2525
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
UNCOV
2526
      qDestroyQuery(pQuery);
×
UNCOV
2527
      pQuery = NULL;
×
UNCOV
2528
      rspObj.resIter = -1;
×
UNCOV
2529
      continue;
×
2530
    }
2531
    break;
6,148✔
2532
  }
2533
  uDebug(LOG_ID_TAG " write raw data return, msg:%s", LOG_ID_VALUE, tstrerror(code));
6,148✔
2534

2535
end:
6,148✔
2536
  tDeleteMqDataRsp(&rspObj.dataRsp);
6,148✔
2537
  tDecoderClear(&decoder);
6,148✔
2538
  qDestroyQuery(pQuery);
6,148✔
2539
  destroyRequest(pRequest);
6,148✔
2540
  RAW_LOG_END
6,148✔
2541
  return code;
6,148✔
2542
}
2543

2544
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
690✔
2545
  if (taos == NULL || data == NULL) {
690✔
UNCOV
2546
    uError("invalid parameter in %s", __func__);
×
UNCOV
2547
    return TSDB_CODE_INVALID_PARA;
×
2548
  }
2549
  int32_t   code = TSDB_CODE_SUCCESS;
690✔
2550
  int32_t   lino = 0;
690✔
2551
  SQuery*   pQuery = NULL;
690✔
2552
  SMqRspObj rspObj = {0};
690✔
2553
  SDecoder  decoder = {0};
690✔
2554
  SHashObj* pCreateTbHash = NULL;
690✔
2555

2556
  SRequestObj*     pRequest = NULL;
690✔
2557
  SCatalog*        pCatalog = NULL;
690✔
2558
  SRequestConnInfo conn = {0};
690✔
2559

2560
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
690✔
2561
  uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
690✔
2562
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeSTaosxRsp, &rspObj));
690✔
2563

2564
  pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
690✔
2565
  RAW_NULL_CHECK(pCreateTbHash);
690✔
2566
  RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash));
690✔
2567

2568
  SHashObj* pVgHash = NULL;
690✔
2569
  SHashObj* pNameHash = NULL;
690✔
2570
  SHashObj* pMetaHash = NULL;
690✔
2571
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
690✔
2572
  int retry = 0;
690✔
2573
  while (1) {
2574
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
690✔
2575
    uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
690✔
2576
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
2,070✔
2577
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
1,380✔
2578
      RAW_NULL_CHECK(tbName);
1,380✔
2579
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
1,380✔
2580
      RAW_NULL_CHECK(pSW);
1,380✔
2581
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
1,380✔
2582
      RAW_NULL_CHECK(pRetrieve);
1,380✔
2583
      void* rawData = getRawDataFromRes(pRetrieve);
1,380✔
2584
      RAW_NULL_CHECK(rawData);
1,380✔
2585

2586
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
1,380✔
2587
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
1,380✔
2588
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
1,380✔
2589
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
1,380✔
2590

2591
      // find schema data info
2592
      SVCreateTbReq* pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, pName.tname, strlen(pName.tname));
1,380✔
2593
      STableMeta*    pTableMeta = NULL;
1,380✔
2594
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, pCreateReqDst, pCatalog, &conn, &pName,
1,380✔
2595
                                        &pTableMeta, pSW, rawData, retry));
2596
      char err[ERR_MSG_LEN] = {0};
1,380✔
2597
      code =
2598
          rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
1,380✔
2599
      if (code != TSDB_CODE_SUCCESS) {
1,380✔
UNCOV
2600
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
UNCOV
2601
        goto end;
×
2602
      }
2603
    }
2604
    RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
690✔
2605
    launchQueryImpl(pRequest, pQuery, true, NULL);
690✔
2606
    code = pRequest->code;
690✔
2607

2608
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
690✔
UNCOV
2609
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
UNCOV
2610
      qDestroyQuery(pQuery);
×
2611
      pQuery = NULL;
×
UNCOV
2612
      rspObj.resIter = -1;
×
UNCOV
2613
      continue;
×
2614
    }
2615
    break;
690✔
2616
  }
2617
  uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
690✔
2618

2619
end:
690✔
2620
  tDeleteSTaosxRsp(&rspObj.dataRsp);
690✔
2621
  void* pIter = taosHashIterate(pCreateTbHash, NULL);
690✔
2622
  while (pIter) {
2,070✔
2623
    tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE);
1,380✔
2624
    pIter = taosHashIterate(pCreateTbHash, pIter);
1,380✔
2625
  }
2626
  taosHashCleanup(pCreateTbHash);
690✔
2627
  tDecoderClear(&decoder);
690✔
2628
  qDestroyQuery(pQuery);
690✔
2629
  destroyRequest(pRequest);
690✔
2630
  RAW_LOG_END
690✔
2631
  return code;
690✔
2632
}
2633

UNCOV
2634
static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
×
UNCOV
2635
  if (taos == NULL || data == NULL) {
×
UNCOV
2636
    uError("invalid parameter in %s", __func__);
×
UNCOV
2637
    return TSDB_CODE_INVALID_PARA;
×
2638
  }
UNCOV
2639
  int32_t   code = TSDB_CODE_SUCCESS;
×
UNCOV
2640
  int32_t   lino = 0;
×
2641
  SQuery*   pQuery = NULL;
×
2642
  SHashObj* pVgroupHash = NULL;
×
UNCOV
2643
  SMqRspObj rspObj = {0};
×
UNCOV
2644
  SDecoder  decoder = {0};
×
2645

UNCOV
2646
  SRequestObj*     pRequest = NULL;
×
UNCOV
2647
  SCatalog*        pCatalog = NULL;
×
UNCOV
2648
  SRequestConnInfo conn = {0};
×
2649

UNCOV
2650
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
×
UNCOV
2651
  uDebug(LOG_ID_TAG " write raw rawdata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
×
2652
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
×
2653

UNCOV
2654
  SHashObj* pVgHash = NULL;
×
UNCOV
2655
  SHashObj* pNameHash = NULL;
×
UNCOV
2656
  SHashObj* pMetaHash = NULL;
×
UNCOV
2657
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
×
UNCOV
2658
  int retry = 0;
×
UNCOV
2659
  while (1) {
×
UNCOV
2660
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
×
UNCOV
2661
    uDebug(LOG_ID_TAG " write raw rawdata block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
×
UNCOV
2662
    SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(pQuery)->pRoot;
×
UNCOV
2663
    pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
×
UNCOV
2664
    RAW_NULL_CHECK(pVgroupHash);
×
UNCOV
2665
    pStmt->pVgDataBlocks = taosArrayInit(8, POINTER_BYTES);
×
UNCOV
2666
    RAW_NULL_CHECK(pStmt->pVgDataBlocks);
×
2667

UNCOV
2668
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
×
UNCOV
2669
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
×
UNCOV
2670
      RAW_NULL_CHECK(tbName);
×
UNCOV
2671
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
×
UNCOV
2672
      RAW_NULL_CHECK(pRetrieve);
×
UNCOV
2673
      void* rawData = getRawDataFromRes(pRetrieve);
×
UNCOV
2674
      RAW_NULL_CHECK(rawData);
×
2675

UNCOV
2676
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
×
UNCOV
2677
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
×
UNCOV
2678
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
×
UNCOV
2679
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
×
2680

2681
      // find schema data info
UNCOV
2682
      STableMeta* pTableMeta = NULL;
×
2683
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, &pTableMeta, NULL,
×
2684
                                        NULL, retry));
UNCOV
2685
      char err[ERR_MSG_LEN] = {0};
×
UNCOV
2686
      code = rawBlockBindRawData(pVgroupHash, pStmt->pVgDataBlocks, pTableMeta, rawData);
×
UNCOV
2687
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
2688
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
UNCOV
2689
        goto end;
×
2690
      }
2691
    }
UNCOV
2692
    taosHashCleanup(pVgroupHash);
×
UNCOV
2693
    pVgroupHash = NULL;
×
2694

UNCOV
2695
    RAW_RETURN_CHECK(smlBuildOutputRaw(pQuery, pVgHash));
×
UNCOV
2696
    launchQueryImpl(pRequest, pQuery, true, NULL);
×
UNCOV
2697
    code = pRequest->code;
×
2698

UNCOV
2699
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
×
UNCOV
2700
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
UNCOV
2701
      qDestroyQuery(pQuery);
×
UNCOV
2702
      pQuery = NULL;
×
UNCOV
2703
      rspObj.resIter = -1;
×
UNCOV
2704
      continue;
×
2705
    }
UNCOV
2706
    break;
×
2707
  }
UNCOV
2708
  uDebug(LOG_ID_TAG " write raw rawdata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
×
2709

2710
end:
×
2711
  tDeleteMqDataRsp(&rspObj.dataRsp);
×
2712
  tDecoderClear(&decoder);
×
2713
  qDestroyQuery(pQuery);
×
2714
  taosHashCleanup(pVgroupHash);
×
UNCOV
2715
  destroyRequest(pRequest);
×
2716
  RAW_LOG_END
×
2717
  return code;
×
2718
}
2719

2720
static int32_t processSimpleMeta(SMqMetaRsp* pMetaRsp, cJSON** meta) {
33,226✔
2721
  if (pMetaRsp == NULL || meta == NULL) {
33,226✔
UNCOV
2722
    uError("invalid parameter in %s", __func__);
×
UNCOV
2723
    return TSDB_CODE_INVALID_PARA;
×
2724
  }
2725
  int32_t code = 0;
33,226✔
2726
  int32_t lino = 0;
33,226✔
2727
  RAW_LOG_START
33,226✔
2728
  if (pMetaRsp->resMsgType == TDMT_VND_CREATE_STB) {
33,226✔
2729
    RAW_RETURN_CHECK(processCreateStb(pMetaRsp, meta));
6,529✔
2730
  } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_STB) {
26,697✔
2731
    RAW_RETURN_CHECK(processAlterStb(pMetaRsp, meta));
1,914✔
2732
  } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_STB) {
24,783✔
UNCOV
2733
    RAW_RETURN_CHECK(processDropSTable(pMetaRsp, meta));
×
2734
  } else if (pMetaRsp->resMsgType == TDMT_VND_CREATE_TABLE) {
24,783✔
2735
    RAW_RETURN_CHECK(processCreateTable(pMetaRsp, meta));
16,338✔
2736
  } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_TABLE) {
8,445✔
2737
    RAW_RETURN_CHECK(processAlterTable(pMetaRsp, meta));
8,445✔
UNCOV
2738
  } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_TABLE) {
×
UNCOV
2739
    RAW_RETURN_CHECK(processDropTable(pMetaRsp, meta));
×
UNCOV
2740
  } else if (pMetaRsp->resMsgType == TDMT_VND_DELETE) {
×
2741
    RAW_RETURN_CHECK(processDeleteTable(pMetaRsp, meta));
×
2742
  }
2743

UNCOV
2744
end:
×
2745
  RAW_LOG_END
33,226✔
2746
  return code;
33,226✔
2747
}
2748

2749
static int32_t processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp, char** string) {
2,300✔
2750
  if (pMsgRsp == NULL || string == NULL) {
2,300✔
UNCOV
2751
    uError("invalid parameter in %s", __func__);
×
UNCOV
2752
    return TSDB_CODE_INVALID_PARA;
×
2753
  }
2754
  SDecoder        coder = {0};
2,300✔
2755
  SMqBatchMetaRsp rsp = {0};
2,300✔
2756
  int32_t         code = 0;
2,300✔
2757
  int32_t         lino = 0;
2,300✔
2758
  cJSON*          pJson = NULL;
2,300✔
2759
  tDecoderInit(&coder, pMsgRsp->pMetaBuff, pMsgRsp->metaBuffLen);
2,300✔
2760
  RAW_RETURN_CHECK(tDecodeMqBatchMetaRsp(&coder, &rsp));
2,300✔
2761

2762
  pJson = cJSON_CreateObject();
2,300✔
2763
  RAW_NULL_CHECK(pJson);
2,300✔
2764
  RAW_FALSE_CHECK(cJSON_AddStringToObject(pJson, "tmq_meta_version", TMQ_META_VERSION));
2,300✔
2765
  cJSON* pMetaArr = cJSON_AddArrayToObject(pJson, "metas");
2,300✔
2766
  RAW_NULL_CHECK(pMetaArr);
2,300✔
2767

2768
  int32_t num = taosArrayGetSize(rsp.batchMetaReq);
2,300✔
2769
  for (int32_t i = 0; i < num; i++) {
9,980✔
2770
    int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
7,680✔
2771
    RAW_NULL_CHECK(len);
7,680✔
2772
    void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
7,680✔
2773
    RAW_NULL_CHECK(tmpBuf);
7,680✔
2774
    SDecoder   metaCoder = {0};
7,680✔
2775
    SMqMetaRsp metaRsp = {0};
7,680✔
2776
    tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead));
7,680✔
2777
    RAW_RETURN_CHECK(tDecodeMqMetaRsp(&metaCoder, &metaRsp));
7,680✔
2778
    cJSON* pItem = NULL;
7,680✔
2779
    RAW_RETURN_CHECK(processSimpleMeta(&metaRsp, &pItem));
7,680✔
2780
    tDeleteMqMetaRsp(&metaRsp);
7,680✔
2781
    if (pItem != NULL) RAW_FALSE_CHECK(cJSON_AddItemToArray(pMetaArr, pItem));
7,680✔
2782
  }
2783

2784
  char* fullStr = cJSON_PrintUnformatted(pJson);
2,300✔
2785
  *string = fullStr;
2,300✔
2786

2787
end:
2,300✔
2788
  cJSON_Delete(pJson);
2,300✔
2789
  tDeleteMqBatchMetaRsp(&rsp);
2,300✔
2790
  RAW_LOG_END
2,300✔
2791
  return code;
2,300✔
2792
}
2793

2794
char* tmq_get_json_meta(TAOS_RES* res) {
29,596✔
2795
  int32_t code = TSDB_CODE_SUCCESS;
29,596✔
2796
  int32_t lino = 0;
29,596✔
2797
  char*   string = NULL;
29,596✔
2798
  RAW_LOG_START
29,596✔
2799
  RAW_NULL_CHECK(res);
29,596✔
2800
  RAW_FALSE_CHECK(TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res));
29,596✔
2801

2802
  SMqRspObj* rspObj = (SMqRspObj*)res;
29,596✔
2803
  if (TD_RES_TMQ_METADATA(res)) {
29,596✔
2804
    RAW_RETURN_CHECK(processAutoCreateTable(&rspObj->dataRsp, &string));
1,750✔
2805
  } else if (TD_RES_TMQ_BATCH_META(res)) {
27,846✔
2806
    RAW_RETURN_CHECK(processBatchMetaToJson(&rspObj->batchMetaRsp, &string));
2,300✔
2807
  } else if (TD_RES_TMQ_META(res)) {
25,546✔
2808
    cJSON* pJson = NULL;
25,546✔
2809
    RAW_RETURN_CHECK(processSimpleMeta(&rspObj->metaRsp, &pJson));
25,546✔
2810
    string = cJSON_PrintUnformatted(pJson);
25,546✔
2811
    cJSON_Delete(pJson);
25,546✔
2812
  } else {
UNCOV
2813
    uError("tmq_get_json_meta res:%d, invalid type", *(int8_t*)res);
×
2814
  }
2815

2816
  uDebug("tmq_get_json_meta string:%s", string);
29,596✔
2817

2818
end:
29,596✔
2819
  RAW_LOG_END
29,596✔
2820
  return string;
29,596✔
2821
}
2822

2823
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
29,596✔
2824

2825
static int32_t getOffSetLen(const SMqDataRsp* pRsp) {
6,886✔
2826
  if (pRsp == NULL) {
6,886✔
2827
    uError("invalid parameter in %s", __func__);
×
UNCOV
2828
    return TSDB_CODE_INVALID_PARA;
×
2829
  }
2830
  int32_t pos = 0;
6,886✔
2831
  int32_t code = 0;
6,886✔
2832
  int32_t lino = 0;
6,886✔
2833
  RAW_LOG_START
6,886✔
2834
  SEncoder coder = {0};
6,886✔
2835
  tEncoderInit(&coder, NULL, 0);
6,886✔
2836
  RAW_RETURN_CHECK(tEncodeSTqOffsetVal(&coder, &pRsp->reqOffset));
6,886✔
2837
  RAW_RETURN_CHECK(tEncodeSTqOffsetVal(&coder, &pRsp->rspOffset));
6,886✔
2838
  pos = coder.pos;
6,886✔
2839
  tEncoderClear(&coder);
6,886✔
2840

2841
end:
6,886✔
2842
  if (code != 0) {
6,886✔
UNCOV
2843
    uError("getOffSetLen failed, code:%d", code);
×
UNCOV
2844
    return code;
×
2845
  } else {
2846
    uDebug("getOffSetLen success, len:%d", pos);
6,886✔
2847
    return pos;
6,886✔
2848
  }
2849
}
2850

2851
typedef int32_t __encode_func__(SEncoder* pEncoder, const SMqDataRsp* pRsp);
2852
static int32_t  encodeMqDataRsp(__encode_func__* encodeFunc, SMqDataRsp* rspObj, tmq_raw_data* raw) {
6,886✔
2853
  if (raw == NULL || encodeFunc == NULL || rspObj == NULL) {
6,886✔
UNCOV
2854
    uError("invalid parameter in %s", __func__);
×
UNCOV
2855
    return TSDB_CODE_INVALID_PARA;
×
2856
  }
2857
  uint32_t len = 0;
6,886✔
2858
  int32_t  code = 0;
6,886✔
2859
  int32_t  lino = 0;
6,886✔
2860
  SEncoder encoder = {0};
6,886✔
2861
  void*    buf = NULL;
6,886✔
2862
  tEncodeSize(encodeFunc, rspObj, len, code);
6,886✔
2863
  RAW_FALSE_CHECK(code >= 0);
6,886✔
2864
  len += sizeof(int8_t) + sizeof(int32_t);
6,886✔
2865
  buf = taosMemoryCalloc(1, len);
6,886✔
2866
  RAW_NULL_CHECK(buf);
6,886✔
2867
  tEncoderInit(&encoder, buf, len);
6,886✔
2868
  RAW_RETURN_CHECK(tEncodeI8(&encoder, MQ_DATA_RSP_VERSION));
6,886✔
2869
  int32_t offsetLen = getOffSetLen(rspObj);
6,886✔
2870
  RAW_FALSE_CHECK(offsetLen > 0);
6,886✔
2871
  RAW_RETURN_CHECK(tEncodeI32(&encoder, offsetLen));
6,886✔
2872
  RAW_RETURN_CHECK(encodeFunc(&encoder, rspObj));
6,886✔
2873

2874
  raw->raw = buf;
6,886✔
2875
  buf = NULL;
6,886✔
2876
  raw->raw_len = len;
6,886✔
2877

2878
end:
6,886✔
2879
  RAW_LOG_END
6,886✔
2880
  return code;
6,886✔
2881
}
2882

2883
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
29,066✔
2884
  if (raw == NULL || res == NULL) {
29,066✔
UNCOV
2885
    uError("invalid parameter in %s", __func__);
×
UNCOV
2886
    return TSDB_CODE_INVALID_PARA;
×
2887
  }
2888
  int32_t code = TSDB_CODE_SUCCESS;
29,066✔
2889
  int32_t lino = 0;
29,066✔
2890
  RAW_LOG_START
29,066✔
2891
  *raw = (tmq_raw_data){0};
29,066✔
2892
  SMqRspObj* rspObj = ((SMqRspObj*)res);
29,066✔
2893
  if (TD_RES_TMQ_META(res)) {
29,066✔
2894
    raw->raw = rspObj->metaRsp.metaRsp;
20,837✔
2895
    raw->raw_len = rspObj->metaRsp.metaRspLen >= 0 ? rspObj->metaRsp.metaRspLen : 0;
20,837✔
2896
    raw->raw_type = rspObj->metaRsp.resMsgType;
20,837✔
2897
    uDebug("tmq get raw type meta:%p", raw);
20,837✔
2898
  } else if (TD_RES_TMQ(res)) {
8,229✔
2899
    RAW_RETURN_CHECK(encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->dataRsp, raw));
6,196✔
2900
    raw->raw_type = RES_TYPE__TMQ;
6,196✔
2901
    uDebug("tmq get raw type data:%p", raw);
6,196✔
2902
  } else if (TD_RES_TMQ_METADATA(res)) {
2,033✔
2903
    RAW_RETURN_CHECK(encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->dataRsp, raw));
690✔
2904
    raw->raw_type = RES_TYPE__TMQ_METADATA;
690✔
2905
    uDebug("tmq get raw type metadata:%p", raw);
690✔
2906
  } else if (TD_RES_TMQ_BATCH_META(res)) {
1,343✔
2907
    raw->raw = rspObj->batchMetaRsp.pMetaBuff;
1,343✔
2908
    raw->raw_len = rspObj->batchMetaRsp.metaBuffLen;
1,343✔
2909
    raw->raw_type = rspObj->resType;
1,343✔
2910
    uDebug("tmq get raw batch meta:%p", raw);
1,343✔
UNCOV
2911
  } else if (TD_RES_TMQ_RAW(res)) {
×
UNCOV
2912
    raw->raw = rspObj->dataRsp.rawData;
×
UNCOV
2913
    rspObj->dataRsp.rawData = NULL;
×
UNCOV
2914
    raw->raw_len = rspObj->dataRsp.len;
×
UNCOV
2915
    raw->raw_type = rspObj->resType;
×
UNCOV
2916
    uDebug("tmq get raw raw:%p", raw);
×
2917
  } else {
UNCOV
2918
    uError("tmq get raw error type:%d", *(int8_t*)res);
×
UNCOV
2919
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
2920
  }
2921

2922
end:
29,066✔
2923
  RAW_LOG_END
29,066✔
2924
  return code;
29,066✔
2925
}
2926

2927
void tmq_free_raw(tmq_raw_data raw) {
29,018✔
2928
  uDebug("tmq free raw data type:%d", raw.raw_type);
29,018✔
2929
  if (raw.raw_type == RES_TYPE__TMQ || raw.raw_type == RES_TYPE__TMQ_METADATA) {
29,018✔
2930
    taosMemoryFree(raw.raw);
6,838✔
2931
  } else if (raw.raw_type == RES_TYPE__TMQ_RAWDATA && raw.raw != NULL) {
22,180✔
UNCOV
2932
    taosMemoryFree(POINTER_SHIFT(raw.raw, -sizeof(SMqRspHead)));
×
2933
  }
2934
  (void)memset(terrMsg, 0, ERR_MSG_LEN);
29,018✔
2935
}
29,018✔
2936

2937
static int32_t writeRawInit() {
35,741✔
2938
  while (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_START) {
38,841✔
2939
    int8_t old = atomic_val_compare_exchange_8(&initFlag, 0, 1);
3,100✔
2940
    if (old == 0) {
3,100✔
2941
      int32_t code = initRawCacheHash();
3,100✔
2942
      if (code != 0) {
3,100✔
UNCOV
2943
        uError("tmq writeRawImpl init error:%d", code);
×
UNCOV
2944
        atomic_store_8(&initedFlag, WRITE_RAW_INIT_FAIL);
×
UNCOV
2945
        return code;
×
2946
      }
2947
      atomic_store_8(&initedFlag, WRITE_RAW_INIT_OK);
3,100✔
2948
    }
2949
  }
2950

2951
  if (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_FAIL) {
35,741✔
UNCOV
2952
    return TSDB_CODE_INTERNAL_ERROR;
×
2953
  }
2954
  return 0;
35,741✔
2955
}
2956

2957
static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) {
35,741✔
2958
  if (taos == NULL || buf == NULL) {
35,741✔
UNCOV
2959
    uError("invalid parameter in %s", __func__);
×
UNCOV
2960
    return TSDB_CODE_INVALID_PARA;
×
2961
  }
2962
  if (writeRawInit() != 0) {
35,741✔
UNCOV
2963
    return TSDB_CODE_INTERNAL_ERROR;
×
2964
  }
2965

2966
  if (type == TDMT_VND_CREATE_STB) {
35,741✔
2967
    return taosCreateStb(taos, buf, len);
5,143✔
2968
  } else if (type == TDMT_VND_ALTER_STB) {
30,598✔
UNCOV
2969
    return taosCreateStb(taos, buf, len);
×
2970
  } else if (type == TDMT_VND_DROP_STB) {
30,598✔
UNCOV
2971
    return taosDropStb(taos, buf, len);
×
2972
  } else if (type == TDMT_VND_CREATE_TABLE) {
30,598✔
2973
    return taosCreateTable(taos, buf, len);
14,314✔
2974
  } else if (type == TDMT_VND_ALTER_TABLE) {
16,284✔
2975
    return taosAlterTable(taos, buf, len);
8,103✔
2976
  } else if (type == TDMT_VND_DROP_TABLE) {
8,181✔
UNCOV
2977
    return taosDropTable(taos, buf, len);
×
2978
  } else if (type == TDMT_VND_DELETE) {
8,181✔
UNCOV
2979
    return taosDeleteData(taos, buf, len);
×
2980
  } else if (type == RES_TYPE__TMQ_METADATA) {
8,181✔
2981
    return tmqWriteRawMetaDataImpl(taos, buf, len);
690✔
2982
  } else if (type == RES_TYPE__TMQ_RAWDATA) {
7,491✔
UNCOV
2983
    return tmqWriteRawRawDataImpl(taos, buf, len);
×
2984
  } else if (type == RES_TYPE__TMQ) {
7,491✔
2985
    return tmqWriteRawDataImpl(taos, buf, len);
6,148✔
2986
  } else if (type == RES_TYPE__TMQ_BATCH_META) {
1,343✔
2987
    return tmqWriteBatchMetaDataImpl(taos, buf, len);
1,343✔
2988
  }
UNCOV
2989
  return TSDB_CODE_INVALID_PARA;
×
2990
}
2991

2992
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
29,018✔
2993
  if (taos == NULL || raw.raw == NULL || raw.raw_len <= 0) {
29,018✔
UNCOV
2994
    SET_ERROR_MSG("taos:%p or data:%p is NULL or raw_len <= 0", taos, raw.raw);
×
UNCOV
2995
    return TSDB_CODE_INVALID_PARA;
×
2996
  }
2997
  taosClearErrMsg();  // clear global error message
29,018✔
2998

2999
  return writeRawImpl(taos, raw.raw, raw.raw_len, raw.raw_type);
29,018✔
3000
}
3001

3002
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen) {
1,343✔
3003
  if (taos == NULL || meta == NULL) {
1,343✔
UNCOV
3004
    uError("invalid parameter in %s", __func__);
×
UNCOV
3005
    return TSDB_CODE_INVALID_PARA;
×
3006
  }
3007
  SMqBatchMetaRsp rsp = {0};
1,343✔
3008
  SDecoder        coder = {0};
1,343✔
3009
  int32_t         code = TSDB_CODE_SUCCESS;
1,343✔
3010
  int32_t         lino = 0;
1,343✔
3011

3012
  RAW_LOG_START
1,343✔
3013
  // decode and process req
3014
  tDecoderInit(&coder, meta, metaLen);
1,343✔
3015
  RAW_RETURN_CHECK(tDecodeMqBatchMetaRsp(&coder, &rsp));
1,343✔
3016
  int32_t num = taosArrayGetSize(rsp.batchMetaReq);
1,343✔
3017
  for (int32_t i = 0; i < num; i++) {
8,066✔
3018
    int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
6,723✔
3019
    RAW_NULL_CHECK(len);
6,723✔
3020
    void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
6,723✔
3021
    RAW_NULL_CHECK(tmpBuf);
6,723✔
3022
    SDecoder   metaCoder = {0};
6,723✔
3023
    SMqMetaRsp metaRsp = {0};
6,723✔
3024
    tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead));
6,723✔
3025
    RAW_RETURN_CHECK(tDecodeMqMetaRsp(&metaCoder, &metaRsp));
6,723✔
3026
    code = writeRawImpl(taos, metaRsp.metaRsp, metaRsp.metaRspLen, metaRsp.resMsgType);
6,723✔
3027
    tDeleteMqMetaRsp(&metaRsp);
6,723✔
3028
    if (code != TSDB_CODE_SUCCESS) {
6,723✔
UNCOV
3029
      goto end;
×
3030
    }
3031
  }
3032

3033
end:
1,343✔
3034
  tDeleteMqBatchMetaRsp(&rsp);
1,343✔
3035
  RAW_LOG_END
1,343✔
3036
  return code;
1,343✔
3037
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc