• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4979

09 Mar 2026 09:42AM UTC coverage: 68.512% (+0.07%) from 68.439%
#4979

push

travis-ci

web-flow
doc: update user manual. (#34693)

211961 of 309380 relevant lines covered (68.51%)

134839524.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.71
/source/client/src/clientRawBlockWrite.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <string.h>
17
#include "cJSON.h"
18
#include "clientInt.h"
19
#include "osMemPool.h"
20
#include "parser.h"
21
#include "taosdef.h"
22
#include "tarray.h"
23
#include "tcol.h"
24
#include "tcompression.h"
25
#include "tdatablock.h"
26
#include "tdataformat.h"
27
#include "tdef.h"
28
#include "tglobal.h"
29
#include "tmsgtype.h"
30

31
#define RAW_LOG_END                                                           \
32
  if (code != 0) {                                                            \
33
    uError("%s failed at line:%d since:%s", __func__, lino, tstrerror(code)); \
34
  } else {                                                                    \
35
    uDebug("%s return success", __func__);                                    \
36
  }
37

38
#define RAW_LOG_START uDebug("%s start", __func__);
39

40
#define RAW_NULL_CHECK(c) \
41
  do {                    \
42
    if (c == NULL) {      \
43
      lino = __LINE__;    \
44
      code = terrno;      \
45
      goto end;           \
46
    }                     \
47
  } while (0)
48

49
#define RAW_FALSE_CHECK(c)           \
50
  do {                               \
51
    if (!(c)) {                      \
52
      code = TSDB_CODE_INVALID_PARA; \
53
      lino = __LINE__;               \
54
      goto end;                      \
55
    }                                \
56
  } while (0)
57

58
#define RAW_RETURN_CHECK(c) \
59
  do {                      \
60
    code = c;               \
61
    if (code != 0) {        \
62
      lino = __LINE__;      \
63
      goto end;             \
64
    }                       \
65
  } while (0)
66

67
#define LOG_ID_TAG   "connId:0x%" PRIx64 ", QID:0x%" PRIx64
68
#define LOG_ID_VALUE *(int64_t*)taos, pRequest->requestId
69

70
#define TMQ_META_VERSION "1.0"
71

72
static cJSON* tmqAddObjectToArray(cJSON* array) {
336,594✔
73
  cJSON* item = cJSON_CreateObject();
336,594✔
74
  if (cJSON_AddItemToArray(array, item)) {
336,594✔
75
    return item;
336,594✔
76
  }
77
  cJSON_Delete(item);
×
78
  return NULL;
×
79
}
80

81
static cJSON* tmqAddStringToArray(cJSON* array, const char* str) {
1,713✔
82
  cJSON* item = cJSON_CreateString(str);
1,713✔
83
  if (cJSON_AddItemToArray(array, item)) {
1,713✔
84
    return item;
1,713✔
85
  }
86
  cJSON_Delete(item);
×
87
  return NULL;
×
88
}
89
 
90
#define ADD_TO_JSON_STRING(JSON,NAME,VALUE) \
91
  RAW_NULL_CHECK(cJSON_AddStringToObject(JSON, NAME, VALUE));
92

93
#define ADD_TO_JSON_BOOL(JSON,NAME,VALUE) \
94
  RAW_NULL_CHECK(cJSON_AddBoolToObject(JSON, NAME, VALUE));
95

96
#define ADD_TO_JSON_NUMBER(JSON,NAME,VALUE) \
97
  RAW_NULL_CHECK(cJSON_AddNumberToObject(JSON, NAME, VALUE));
98

99
static int32_t  tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen);
100
static tb_uid_t processSuid(tb_uid_t suid, char* db) {
34,557✔
101
  if (db == NULL) {
34,557✔
102
    return suid;
×
103
  }
104
  return suid + MurmurHash3_32(db, strlen(db));
34,557✔
105
}
106

107
static int32_t getLength(int8_t type, int32_t bytes, int32_t typeMod) {
238,047✔
108
  int32_t length = 0;
238,047✔
109
  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_VARBINARY || type == TSDB_DATA_TYPE_GEOMETRY) {
238,047✔
110
    length = bytes - VARSTR_HEADER_SIZE;
32,920✔
111
  } else if (type == TSDB_DATA_TYPE_NCHAR) {
205,127✔
112
    length = (bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
25,377✔
113
  } else if (IS_STR_DATA_BLOB(type)) {
179,750✔
114
    length = bytes - BLOBSTR_HEADER_SIZE;
×
115
  } else if (type == TSDB_DATA_TYPE_DECIMAL || type == TSDB_DATA_TYPE_DECIMAL64) {
179,750✔
116
    length = typeMod;
356✔
117
  }
118
  return length;
238,047✔
119
}
120

121
static int32_t buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, SExtSchema* pExtSchemas, char* name, int64_t id, int8_t t,
33,792✔
122
                                 bool isVirtual, SColRefWrapper* colRef, SColCmprWrapper* pColCmprRow, cJSON** pJson) {
123
  if (schemaRow == NULL || name == NULL || pColCmprRow == NULL || pJson == NULL) {
33,792✔
124
    uError("invalid parameter, schemaRow:%p, name:%p, pColCmprRow:%p, pJson:%p", schemaRow, name, pColCmprRow, pJson);
×
125
    return TSDB_CODE_INVALID_PARA;
×
126
  }
127
  int32_t code = TSDB_CODE_SUCCESS;
33,792✔
128
  int32_t lino = 0;
33,792✔
129
  int8_t  buildDefaultCompress = 0;
33,792✔
130
  if (pColCmprRow->nCols <= 0) {
33,792✔
131
    buildDefaultCompress = 1;
2,466✔
132
  }
133
  RAW_LOG_START
33,792✔
134
  char*  string = NULL;
33,792✔
135
  cJSON* json = cJSON_CreateObject();
33,792✔
136
  RAW_NULL_CHECK(json);
33,792✔
137

138
  ADD_TO_JSON_STRING(json, "type", "create");
33,792✔
139
  ADD_TO_JSON_STRING(json, "tableType", (t == TSDB_SUPER_TABLE ? "super" : "normal"));
33,792✔
140
  if (isVirtual){
33,792✔
141
    ADD_TO_JSON_BOOL(json, "isVirtual", isVirtual);
3,895✔
142
  }
143
  ADD_TO_JSON_STRING(json, "tableName", name);
33,792✔
144

145
  cJSON* columns = cJSON_AddArrayToObject(json, "columns");
33,792✔
146
  RAW_NULL_CHECK(columns);
33,792✔
147

148
  for (int i = 0; i < schemaRow->nCols; i++) {
201,311✔
149
    cJSON* column = tmqAddObjectToArray(columns);
167,519✔
150
    RAW_NULL_CHECK(column);
167,519✔
151
    SSchema* s = schemaRow->pSchema + i;
167,519✔
152
    ADD_TO_JSON_STRING(column, "name", s->name);
167,519✔
153
    ADD_TO_JSON_NUMBER(column, "type", s->type);
167,519✔
154
    int32_t typeMod = 0;
167,519✔
155
    if (pExtSchemas != NULL) {
167,519✔
156
      typeMod = pExtSchemas[i].typeMod;
61,807✔
157
    }
158
    int32_t length = getLength(s->type, s->bytes, typeMod);
167,519✔
159
    if (length > 0) {
167,519✔
160
      ADD_TO_JSON_NUMBER(column, "length", length);
29,262✔
161
    }
162

163
    if (isVirtual && colRef != NULL && i < colRef->nCols){
167,519✔
164
      SColRef* pColRef = colRef->pColRef + i;
7,446✔
165
      if (pColRef->hasRef) {
7,446✔
166
        cJSON* ref = cJSON_AddObjectToObject(column, "ref");
4,610✔
167
        RAW_NULL_CHECK(ref);
4,610✔
168
        ADD_TO_JSON_STRING(ref, "refDbName", pColRef->refDbName);
4,610✔
169
        ADD_TO_JSON_STRING(ref, "refTableName", pColRef->refTableName);
4,610✔
170
        ADD_TO_JSON_STRING(ref, "refColName", pColRef->refColName);
4,610✔
171
      }
172
    }
173
    ADD_TO_JSON_BOOL(column, "isPrimarykey", (s->flags & COL_IS_KEY));
167,519✔
174
    if (pColCmprRow == NULL) {
167,519✔
175
      continue;
×
176
    }
177

178
    uint32_t alg = 0;
167,519✔
179
    if (buildDefaultCompress) {
167,519✔
180
      alg = createDefaultColCmprByType(s->type);
8,122✔
181
    } else {
182
      SColCmpr* pColCmpr = pColCmprRow->pColCmpr + i;
159,397✔
183
      alg = pColCmpr->alg;
159,397✔
184
    }
185
    const char* encode = columnEncodeStr(COMPRESS_L1_TYPE_U32(alg));
167,519✔
186
    RAW_NULL_CHECK(encode);
167,519✔
187
    const char* compress = columnCompressStr(COMPRESS_L2_TYPE_U32(alg));
167,519✔
188
    RAW_NULL_CHECK(compress);
167,519✔
189
    const char* level = columnLevelStr(COMPRESS_L2_TYPE_LEVEL_U32(alg));
167,519✔
190
    RAW_NULL_CHECK(level);
167,519✔
191

192
    ADD_TO_JSON_STRING(column, "encode", encode);
167,519✔
193
    ADD_TO_JSON_STRING(column, "compress", compress);
167,519✔
194
    ADD_TO_JSON_STRING(column, "level", level);
167,519✔
195
  }
196

197
  cJSON* tags = cJSON_AddArrayToObject(json, "tags");
33,792✔
198
  RAW_NULL_CHECK(tags);
33,792✔
199

200
  for (int i = 0; schemaTag && i < schemaTag->nCols; i++) {
90,420✔
201
    cJSON* tag = tmqAddObjectToArray(tags);
56,628✔
202
    RAW_NULL_CHECK(tag);
56,628✔
203
    SSchema* s = schemaTag->pSchema + i;
56,628✔
204
    ADD_TO_JSON_STRING(tag, "name", s->name);
56,628✔
205
    ADD_TO_JSON_NUMBER(tag, "type", s->type);
56,628✔
206
    int32_t length = getLength(s->type, s->bytes, 0);
56,628✔
207
    if (length > 0) {
56,628✔
208
      ADD_TO_JSON_NUMBER(tag, "length", length);
21,182✔
209
    }
210
  }
211

212
end:
33,792✔
213
  *pJson = json;
33,792✔
214
  RAW_LOG_END
33,792✔
215
  return code;
33,792✔
216
}
217

218
static int32_t setCompressOption(cJSON* json, uint32_t para) {
×
219
  if (json == NULL) {
×
220
    return TSDB_CODE_INVALID_PARA;
×
221
  }
222
  uint8_t encode = COMPRESS_L1_TYPE_U32(para);
×
223
  int32_t code = 0;
×
224
  int32_t lino = 0;
×
225
  RAW_LOG_START
×
226
  if (encode != 0) {
×
227
    const char* encodeStr = columnEncodeStr(encode);
×
228
    RAW_NULL_CHECK(encodeStr);
×
229
    ADD_TO_JSON_STRING(json, "encode", encodeStr);
×
230
    goto end;
×
231
  }
232
  uint8_t compress = COMPRESS_L2_TYPE_U32(para);
×
233
  if (compress != 0) {
×
234
    const char* compressStr = columnCompressStr(compress);
×
235
    RAW_NULL_CHECK(compressStr);
×
236
    ADD_TO_JSON_STRING(json, "compress", compressStr);
×
237
    goto end;
×
238
  }
239
  uint8_t level = COMPRESS_L2_TYPE_LEVEL_U32(para);
×
240
  if (level != 0) {
×
241
    const char* levelStr = columnLevelStr(level);
×
242
    RAW_NULL_CHECK(levelStr);
×
243
    ADD_TO_JSON_STRING(json, "level", levelStr);
×
244
    goto end;
×
245
  }
246

247
end:
×
248
  RAW_LOG_END
×
249
  return code;
×
250
}
251
static int32_t buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON** pJson) {
13,161✔
252
  if (alterData == NULL || pJson == NULL) {
13,161✔
253
    uError("invalid parameter in %s alterData:%p", __func__, alterData);
×
254
    return TSDB_CODE_INVALID_PARA;
×
255
  }
256
  SMAlterStbReq req = {0};
13,161✔
257
  cJSON*        json = NULL;
13,161✔
258
  char*         string = NULL;
13,161✔
259
  int32_t       code = 0;
13,161✔
260
  int32_t       lino = 0;
13,161✔
261
  RAW_LOG_START
13,161✔
262
  RAW_RETURN_CHECK(tDeserializeSMAlterStbReq(alterData, alterDataLen, &req));
13,161✔
263
  json = cJSON_CreateObject();
13,161✔
264
  RAW_NULL_CHECK(json);
13,161✔
265
  ADD_TO_JSON_STRING(json, "type", "alter");
13,161✔
266
  SName name = {0};
13,161✔
267
  RAW_RETURN_CHECK(tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
13,161✔
268
  ADD_TO_JSON_STRING(json, "tableType", "super");
13,161✔
269
  ADD_TO_JSON_STRING(json, "tableName", name.tname);
13,161✔
270
  ADD_TO_JSON_NUMBER(json, "alterType", req.alterType);
13,161✔
271

272
  switch (req.alterType) {
13,161✔
273
    case TSDB_ALTER_TABLE_ADD_TAG:
7,716✔
274
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
275
      if (taosArrayGetSize(req.pFields) != 1) {
7,716✔
276
        uError("invalid field num %" PRIzu " for alter type %d", taosArrayGetSize(req.pFields), req.alterType);
×
277
        cJSON_Delete(json);
×
278
        json = NULL;
×
279
        code = TSDB_CODE_INVALID_PARA;
×
280
        goto end;
×
281
      }
282
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
7,716✔
283
      RAW_NULL_CHECK(field);
7,716✔
284
      ADD_TO_JSON_STRING(json, "colName", field->name);
7,716✔
285
      ADD_TO_JSON_NUMBER(json, "colType", field->type);
7,716✔
286
      int32_t typeMode = 0;
7,716✔
287
      if (taosArrayGetSize(req.pTypeMods) > 0) {
7,716✔
288
        typeMode = *(STypeMod*)taosArrayGet(req.pTypeMods, 0);
4,542✔
289
      }
290
      int32_t length = getLength(field->type, field->bytes, typeMode);
7,716✔
291
      if (length > 0) {
7,716✔
292
        ADD_TO_JSON_NUMBER(json, "colLength", length);
3,174✔
293
      }
294

295
      break;
7,716✔
296
    }
297
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: {
×
298
      SFieldWithOptions* field = taosArrayGet(req.pFields, 0);
×
299
      RAW_NULL_CHECK(field);
×
300
      ADD_TO_JSON_STRING(json, "colName", field->name);
×
301
      ADD_TO_JSON_NUMBER(json, "colType", field->type);
×
302
      int32_t typeMode = 0;
×
303
      if (taosArrayGetSize(req.pTypeMods) > 0) {
×
304
        typeMode = *(STypeMod*)taosArrayGet(req.pTypeMods, 0);
×
305
      }
306
      int32_t length = getLength(field->type, field->bytes, typeMode);
×
307
      if (length > 0) {
×
308
        ADD_TO_JSON_NUMBER(json, "colLength", length);
×
309
      }
310

311
      RAW_RETURN_CHECK(setCompressOption(json, field->compress));
×
312
      break;
×
313
    }
314
    case TSDB_ALTER_TABLE_DROP_TAG:
2,271✔
315
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
316
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
2,271✔
317
      RAW_NULL_CHECK(field);
2,271✔
318
      ADD_TO_JSON_STRING(json, "colName", field->name);
2,271✔
319
      break;
2,271✔
320
    }
321
    case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
3,174✔
322
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
323
      if (taosArrayGetSize(req.pFields) != 1) {
3,174✔
324
        uError("invalid field num %" PRIzu " for alter type %d", taosArrayGetSize(req.pFields), req.alterType);
×
325
        cJSON_Delete(json);
×
326
        json = NULL;
×
327
        code = TSDB_CODE_INVALID_PARA;
×
328
        goto end;
×
329
      }
330
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
3,174✔
331
      RAW_NULL_CHECK(field);
3,174✔
332
      ADD_TO_JSON_STRING(json, "colName", field->name);
3,174✔
333
      ADD_TO_JSON_NUMBER(json, "colType", field->type);
3,174✔
334
      int32_t length = getLength(field->type, field->bytes, 0);
3,174✔
335
      if (length > 0) {
3,174✔
336
        ADD_TO_JSON_NUMBER(json, "colLength", length);
3,174✔
337
      }
338

339
      break;
3,174✔
340
    }
341
    case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
×
342
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
343
      TAOS_FIELD* oldField = taosArrayGet(req.pFields, 0);
×
344
      RAW_NULL_CHECK(oldField);
×
345
      TAOS_FIELD* newField = taosArrayGet(req.pFields, 1);
×
346
      RAW_NULL_CHECK(newField);
×
347
      ADD_TO_JSON_STRING(json, "colName", oldField->name);
×
348
      ADD_TO_JSON_STRING(json, "colNewName", newField->name);
×
349
      break;
×
350
    }
351
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
×
352
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
×
353
      RAW_NULL_CHECK(field);
×
354
      ADD_TO_JSON_STRING(json, "colName", field->name);
×
355
      RAW_RETURN_CHECK(setCompressOption(json, field->bytes));
×
356
      break;
×
357
    }
358
    default:
×
359
      break;
×
360
  }
361

362
end:
13,161✔
363
  tFreeSMAltertbReq(&req);
13,161✔
364
  *pJson = json;
13,161✔
365
  RAW_LOG_END
13,161✔
366
  return code;
13,161✔
367
}
368

369
static int32_t processCreateStb(SMqMetaRsp* metaRsp, cJSON** pJson) {
24,513✔
370
  if (metaRsp == NULL || pJson == NULL) {
24,513✔
371
    uError("invalid parameter in %s", __func__);
×
372
    return TSDB_CODE_INVALID_PARA;
×
373
  }
374
  int32_t        code = TSDB_CODE_SUCCESS;
24,513✔
375
  int32_t        lino = 0;
24,513✔
376
  SVCreateStbReq req = {0};
24,513✔
377
  SDecoder       coder = {0};
24,513✔
378

379
  RAW_LOG_START
24,513✔
380
  // decode and process req
381
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
24,513✔
382
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
24,513✔
383
  tDecoderInit(&coder, data, len);
24,513✔
384

385
  RAW_RETURN_CHECK(tDecodeSVCreateStbReq(&coder, &req));
24,513✔
386
  RAW_RETURN_CHECK(buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.pExtSchemas, req.name, req.suid,
24,513✔
387
                                        TSDB_SUPER_TABLE, req.virtualStb, NULL, &req.colCmpr, pJson));
388

389
end:
24,513✔
390
  tDecoderClear(&coder);
24,513✔
391
  RAW_LOG_END
24,513✔
392
  return code;
24,513✔
393
}
394

395
static int32_t processAlterStb(SMqMetaRsp* metaRsp, cJSON** pJson) {
13,161✔
396
  if (metaRsp == NULL || pJson == NULL) {
13,161✔
397
    uError("invalid parameter in %s", __func__);
×
398
    return TSDB_CODE_INVALID_PARA;
×
399
  }
400
  SVCreateStbReq req = {0};
13,161✔
401
  SDecoder       coder = {0};
13,161✔
402
  int32_t        code = TSDB_CODE_SUCCESS;
13,161✔
403
  int32_t        lino = 0;
13,161✔
404
  RAW_LOG_START
13,161✔
405

406
  // decode and process req
407
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
13,161✔
408
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
13,161✔
409
  tDecoderInit(&coder, data, len);
13,161✔
410

411
  RAW_RETURN_CHECK(tDecodeSVCreateStbReq(&coder, &req));
13,161✔
412
  RAW_RETURN_CHECK(buildAlterSTableJson(req.alterOriData, req.alterOriDataLen, pJson));
13,161✔
413

414
end:
13,161✔
415
  tDecoderClear(&coder);
13,161✔
416
  RAW_LOG_END
13,161✔
417
  return code;
13,161✔
418
}
419

420
static int32_t buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
47,167✔
421
  if (json == NULL || pCreateReq == NULL) {
47,167✔
422
    uError("invalid parameter in %s", __func__);
×
423
    return TSDB_CODE_INVALID_PARA;
×
424
  }
425
  STag*   pTag = (STag*)pCreateReq->ctb.pTag;
47,167✔
426
  char*   sname = pCreateReq->ctb.stbName;
47,167✔
427
  char*   name = pCreateReq->name;
47,167✔
428
  SArray* tagName = pCreateReq->ctb.tagName;
47,167✔
429
  int64_t id = pCreateReq->uid;
47,167✔
430
  uint8_t tagNum = pCreateReq->ctb.tagNum;
47,167✔
431
  int32_t code = 0;
47,167✔
432
  int32_t lino = 0;
47,167✔
433
  SArray* pTagVals = NULL;
47,167✔
434
  char*   pJson = NULL;
47,167✔
435
  char*   buf = NULL;
47,167✔
436
  RAW_LOG_START
47,167✔
437

438
  ADD_TO_JSON_STRING(json, "tableName", name);
47,167✔
439

440
  if (pCreateReq->type == TSDB_VIRTUAL_CHILD_TABLE) {
47,167✔
441
    cJSON* refs = cJSON_AddArrayToObject(json, "refs");
3,534✔
442
    RAW_NULL_CHECK(refs);
3,534✔
443

444
    for (int i = 0; i < pCreateReq->colRef.nCols; i++) {
17,670✔
445
      SColRef* pColRef = pCreateReq->colRef.pColRef + i;
14,136✔
446
  
447
      if (!pColRef->hasRef) {
14,136✔
448
        continue;
7,068✔
449
      }
450
      cJSON* ref = tmqAddObjectToArray(refs);
7,068✔
451
      RAW_NULL_CHECK(ref);
7,068✔
452
      ADD_TO_JSON_STRING(ref, "colName", pColRef->colName);
7,068✔
453
      ADD_TO_JSON_STRING(ref, "refDbName", pColRef->refDbName);
7,068✔
454
      ADD_TO_JSON_STRING(ref, "refTableName", pColRef->refTableName);
7,068✔
455
      ADD_TO_JSON_STRING(ref, "refColName", pColRef->refColName);
7,068✔
456
    }
457
  }
458

459
  ADD_TO_JSON_STRING(json, "using", sname);
47,167✔
460
  ADD_TO_JSON_NUMBER(json, "tagNum", tagNum);
47,167✔
461

462
  cJSON* tags = cJSON_AddArrayToObject(json, "tags");
47,167✔
463
  RAW_NULL_CHECK(tags);
47,167✔
464
  RAW_RETURN_CHECK(tTagToValArray(pTag, &pTagVals));
47,167✔
465
  if (tTagIsJson(pTag)) {
47,167✔
466
    STag* p = (STag*)pTag;
5,592✔
467
    if (p->nTag == 0) {
5,592✔
468
      uWarn("p->nTag == 0");
2,796✔
469
      goto end;
2,796✔
470
    }
471
    parseTagDatatoJson(pTag, &pJson, NULL);
2,796✔
472
    RAW_NULL_CHECK(pJson);
2,796✔
473
    cJSON* tag = tmqAddObjectToArray(tags);
2,796✔
474
    RAW_NULL_CHECK(tag);
2,796✔
475
    STagVal* pTagVal = taosArrayGet(pTagVals, 0);
2,796✔
476
    RAW_NULL_CHECK(pTagVal);
2,796✔
477
    char* ptname = taosArrayGet(tagName, 0);
2,796✔
478
    RAW_NULL_CHECK(ptname);
2,796✔
479
    ADD_TO_JSON_STRING(tag, "name", ptname);
2,796✔
480
    ADD_TO_JSON_NUMBER(tag, "type", TSDB_DATA_TYPE_JSON);
2,796✔
481
    ADD_TO_JSON_STRING(tag, "value", pJson);
2,796✔
482
    goto end;
2,796✔
483
  }
484

485
  for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
135,376✔
486
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
93,801✔
487
    RAW_NULL_CHECK(pTagVal);
93,801✔
488
    cJSON* tag = tmqAddObjectToArray(tags);
93,801✔
489
    RAW_NULL_CHECK(tag);
93,801✔
490
    char* ptname = taosArrayGet(tagName, i);
93,801✔
491
    RAW_NULL_CHECK(ptname);
93,801✔
492
    ADD_TO_JSON_STRING(tag, "name", ptname);
93,801✔
493
    ADD_TO_JSON_NUMBER(tag, "type", pTagVal->type);
93,801✔
494

495
    if (IS_VAR_DATA_TYPE(pTagVal->type)) {
128,594✔
496
      if (IS_STR_DATA_BLOB(pTagVal->type)) {
34,793✔
497
        goto end;
×
498
      }
499
      int64_t bufSize = 0;
34,793✔
500
      if (pTagVal->type == TSDB_DATA_TYPE_VARBINARY) {
34,793✔
501
        bufSize = pTagVal->nData * 2 + 2 + 3;
×
502
      } else {
503
        bufSize = pTagVal->nData + 3;
34,793✔
504
      }
505
      buf = taosMemoryCalloc(bufSize, 1);
34,793✔
506
      RAW_NULL_CHECK(buf);
34,793✔
507
      code = dataConverToStr(buf, bufSize, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL);
34,793✔
508
      if (code != TSDB_CODE_SUCCESS) {
34,793✔
509
        uError("convert tag value to string failed");
×
510
        goto end;
×
511
      }
512

513
      ADD_TO_JSON_STRING(tag, "value", buf)
34,793✔
514
      taosMemoryFreeClear(buf);
34,793✔
515
    } else {
516
      double val = 0;
59,008✔
517
      GET_TYPED_DATA(val, double, pTagVal->type, &pTagVal->i64, 0);  // currently tag type can't be decimal, so pass 0 as typeMod
59,008✔
518
      ADD_TO_JSON_NUMBER(tag, "value", val)
59,008✔
519
    }
520
  }
521

522
end:
47,167✔
523
  taosMemoryFree(pJson);
47,167✔
524
  taosArrayDestroy(pTagVals);
47,167✔
525
  taosMemoryFree(buf);
47,167✔
526
  RAW_LOG_END
47,167✔
527
  return code;
47,167✔
528
}
529

530
static int32_t buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs, cJSON** pJson) {
38,385✔
531
  if (pJson == NULL || pCreateReq == NULL) {
38,385✔
532
    uError("invalid parameter in %s", __func__);
×
533
    return TSDB_CODE_INVALID_PARA;
×
534
  }
535
  int32_t code = 0;
38,385✔
536
  int32_t lino = 0;
38,385✔
537
  RAW_LOG_START
38,385✔
538
  char*  string = NULL;
38,385✔
539
  cJSON* json = cJSON_CreateObject();
38,385✔
540
  RAW_NULL_CHECK(json);
38,385✔
541
  ADD_TO_JSON_STRING(json, "type", "create");
38,385✔
542
  ADD_TO_JSON_STRING(json, "tableType", "child");
38,385✔
543

544
  RAW_RETURN_CHECK(buildChildElement(json, pCreateReq));
38,385✔
545
  cJSON* createList = cJSON_AddArrayToObject(json, "createList");
38,385✔
546
  RAW_NULL_CHECK(createList);
38,385✔
547

548
  for (int i = 0; nReqs > 1 && i < nReqs; i++) {
47,167✔
549
    cJSON* create = tmqAddObjectToArray(createList);
8,782✔
550
    RAW_NULL_CHECK(create);
8,782✔
551
    RAW_RETURN_CHECK(buildChildElement(create, pCreateReq + i));
8,782✔
552
  }
553

554
end:
38,385✔
555
  *pJson = json;
38,385✔
556
  RAW_LOG_END
38,385✔
557
  return code;
38,385✔
558
}
559

560
static int32_t processCreateTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
44,901✔
561
  if (pJson == NULL || metaRsp == NULL) {
44,901✔
562
    uError("invalid parameter in %s", __func__);
×
563
    return TSDB_CODE_INVALID_PARA;
×
564
  }
565
  int32_t            code = TSDB_CODE_SUCCESS;
44,901✔
566
  int32_t            lino = 0;
44,901✔
567
  SDecoder           decoder = {0};
44,901✔
568
  SVCreateTbBatchReq req = {0};
44,901✔
569
  SVCreateTbReq*     pCreateReq;
570
  RAW_LOG_START
44,901✔
571
  // decode
572
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
44,901✔
573
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
44,901✔
574
  tDecoderInit(&decoder, data, len);
44,901✔
575
  RAW_RETURN_CHECK(tDecodeSVCreateTbBatchReq(&decoder, &req));
44,901✔
576
  // loop to create table
577
  if (req.nReqs > 0) {
44,901✔
578
    pCreateReq = req.pReqs;
44,901✔
579
    if (pCreateReq->type == TSDB_CHILD_TABLE || pCreateReq->type == TSDB_VIRTUAL_CHILD_TABLE) {
44,901✔
580
      RAW_RETURN_CHECK(buildCreateCTableJson(req.pReqs, req.nReqs, pJson));
35,960✔
581
    } else if (pCreateReq->type == TSDB_NORMAL_TABLE || pCreateReq->type == TSDB_VIRTUAL_NORMAL_TABLE) {
8,941✔
582
      RAW_RETURN_CHECK(buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->pExtSchemas, pCreateReq->name,
8,941✔
583
                                            pCreateReq->uid, pCreateReq->type, pCreateReq->type == TSDB_NORMAL_TABLE ? false : true, 
584
                                            &pCreateReq->colRef, &pCreateReq->colCmpr, pJson));
585
    }
586
  }
587

588
end:
44,901✔
589
  tDeleteSVCreateTbBatchReq(&req);
44,901✔
590
  tDecoderClear(&decoder);
44,901✔
591
  RAW_LOG_END
44,901✔
592
  return code;
44,901✔
593
}
594

595
static int32_t processAutoCreateTable(SMqDataRsp* rsp, char** string) {
2,763✔
596
  int32_t lino = 0;
2,763✔
597
  int32_t code = TSDB_CODE_SUCCESS;
2,763✔
598
  RAW_LOG_START
2,763✔
599
  RAW_FALSE_CHECK(rsp != NULL && string != NULL);
2,763✔
600
  SDecoder*      decoder = NULL;
2,763✔
601
  SVCreateTbReq* pCreateReq = NULL;
2,763✔
602
  RAW_FALSE_CHECK(rsp->createTableNum > 0);
2,763✔
603

604
  decoder = taosMemoryCalloc(rsp->createTableNum, sizeof(SDecoder));
2,763✔
605
  RAW_NULL_CHECK(decoder);
2,763✔
606
  pCreateReq = taosMemoryCalloc(rsp->createTableNum, sizeof(SVCreateTbReq));
2,763✔
607
  RAW_NULL_CHECK(pCreateReq);
2,763✔
608

609
  // loop to create table
610
  for (int32_t iReq = 0; iReq < rsp->createTableNum; iReq++) {
7,883✔
611
    // decode
612
    void** data = taosArrayGet(rsp->createTableReq, iReq);
5,120✔
613
    RAW_NULL_CHECK(data);
5,120✔
614
    int32_t* len = taosArrayGet(rsp->createTableLen, iReq);
5,120✔
615
    RAW_NULL_CHECK(len);
5,120✔
616
    tDecoderInit(&decoder[iReq], *data, *len);
5,120✔
617
    RAW_RETURN_CHECK(tDecodeSVCreateTbReq(&decoder[iReq], pCreateReq + iReq));
5,120✔
618

619
    if (pCreateReq[iReq].type != TSDB_CHILD_TABLE && pCreateReq[iReq].type != TSDB_NORMAL_TABLE) {
5,120✔
620
      uError("%s failed. pCreateReq[iReq].type:%d invalid", __func__, pCreateReq[iReq].type);
×
621
      code = TSDB_CODE_INVALID_PARA;
×
622
      goto end;
×
623
    }
624
  }
625
  cJSON* pJson = NULL;
2,763✔
626
  if (pCreateReq->type == TSDB_NORMAL_TABLE) {
2,763✔
627
    RAW_RETURN_CHECK(buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->pExtSchemas, pCreateReq->name,
338✔
628
                                          pCreateReq->uid, TSDB_NORMAL_TABLE, false, NULL, &pCreateReq->colCmpr, &pJson));
629
  } else if (pCreateReq->type == TSDB_CHILD_TABLE) {
2,425✔
630
    RAW_RETURN_CHECK(buildCreateCTableJson(pCreateReq, rsp->createTableNum, &pJson));
2,425✔
631
  }
632

633
  *string = cJSON_PrintUnformatted(pJson);
2,763✔
634
  cJSON_Delete(pJson);
2,763✔
635

636
  uDebug("auto created table return, sql json:%s", *string);
2,763✔
637

638
end:
2,763✔
639
  RAW_LOG_END
2,763✔
640
  for (int i = 0; decoder && pCreateReq && i < rsp->createTableNum; i++) {
7,883✔
641
    tDecoderClear(&decoder[i]);
5,120✔
642
    taosMemoryFreeClear(pCreateReq[i].comment);
5,120✔
643
    if (pCreateReq[i].type == TSDB_CHILD_TABLE) {
5,120✔
644
      taosArrayDestroy(pCreateReq[i].ctb.tagName);
4,782✔
645
    }
646
  }
647
  taosMemoryFree(decoder);
2,763✔
648
  taosMemoryFree(pCreateReq);
2,763✔
649
  return code;
2,763✔
650
}
651

652
static int32_t processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
10,062✔
653
  if (pJson == NULL || metaRsp == NULL) {
10,062✔
654
    uError("invalid parameter in %s", __func__);
×
655
    return TSDB_CODE_INVALID_PARA;
×
656
  }
657
  SDecoder     decoder = {0};
10,062✔
658
  SVAlterTbReq vAlterTbReq = {0};
10,062✔
659
  char*        string = NULL;
10,062✔
660
  cJSON*       json = NULL;
10,062✔
661
  int32_t      code = 0;
10,062✔
662
  int32_t      lino = 0;
10,062✔
663
  char*        buf = NULL;
10,062✔
664
  char*        buf1 = NULL;
10,062✔
665

666
  RAW_LOG_START
10,062✔
667

668
  // decode
669
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
10,062✔
670
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
10,062✔
671
  tDecoderInit(&decoder, data, len);
10,062✔
672
  RAW_RETURN_CHECK(tDecodeSVAlterTbReq(&decoder, &vAlterTbReq));
10,062✔
673

674
  json = cJSON_CreateObject();
10,062✔
675
  RAW_NULL_CHECK(json);
10,062✔
676
  ADD_TO_JSON_STRING(json, "type", "alter");
10,062✔
677

678
  char* tableType = NULL;
10,062✔
679
  if (vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ||
10,062✔
680
      vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL){
8,593✔
681
    tableType = "child";
1,469✔
682
  } else if (vAlterTbReq.action == TSDB_ALTER_TABLE_ALTER_COLUMN_REF ||
8,593✔
683
             vAlterTbReq.action == TSDB_ALTER_TABLE_REMOVE_COLUMN_REF) {
7,525✔
684
    tableType = "";
2,136✔
685
  } else {
686
    tableType = "normal";
6,457✔
687
  }
688

689
  ADD_TO_JSON_STRING(json, "tableType", tableType);
10,062✔
690
  ADD_TO_JSON_STRING(json, "tableName", vAlterTbReq.tbName);
10,062✔
691
  ADD_TO_JSON_NUMBER(json, "alterType", vAlterTbReq.action);
10,062✔
692

693
  uDebug("alter table action:%d", vAlterTbReq.action);
10,062✔
694
  switch (vAlterTbReq.action) {
10,062✔
695
    case TSDB_ALTER_TABLE_ADD_COLUMN: 
1,861✔
696
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COLUMN_REF: {
697
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
1,861✔
698
      ADD_TO_JSON_NUMBER(json, "colType", vAlterTbReq.type);
1,861✔
699

700
      int32_t length = getLength(vAlterTbReq.type, vAlterTbReq.bytes, vAlterTbReq.typeMod);
1,861✔
701
      if (length > 0) {
1,861✔
702
        ADD_TO_JSON_NUMBER(json, "colLength", length);
712✔
703
      }
704

705
      if (vAlterTbReq.action == TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COLUMN_REF) {
1,861✔
706
        ADD_TO_JSON_STRING(json, "refDbName", vAlterTbReq.refDbName);
712✔
707
        ADD_TO_JSON_STRING(json, "refTbName", vAlterTbReq.refTbName);
712✔
708
        ADD_TO_JSON_STRING(json, "refColName", vAlterTbReq.refColName);
712✔
709
      }
710
      break;
1,861✔
711
    }
712
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: {
×
713
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
×
714
      ADD_TO_JSON_NUMBER(json, "colType", vAlterTbReq.type);
×
715

716
      int32_t length = getLength(vAlterTbReq.type, vAlterTbReq.bytes, vAlterTbReq.typeMod);
×
717
      if (length > 0) {
×
718
        ADD_TO_JSON_NUMBER(json, "colLength", length);
×
719
      }
720
      RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
×
721
      break;
×
722
    }
723
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
1,149✔
724
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
1,149✔
725
      break;
1,149✔
726
    }
727
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
1,149✔
728
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
1,149✔
729
      ADD_TO_JSON_NUMBER(json, "colType", vAlterTbReq.colModType);
1,149✔
730
      int32_t length = getLength(vAlterTbReq.colModType, vAlterTbReq.colModBytes, vAlterTbReq.typeMod);
1,149✔
731
      if (length > 0) {
1,149✔
732
        ADD_TO_JSON_NUMBER(json, "colLength", length);
1,149✔
733
      }
734
      break;
1,149✔
735
    }
736
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
1,149✔
737
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
1,149✔
738
      ADD_TO_JSON_STRING(json, "colNewName", vAlterTbReq.colNewName);
1,149✔
739
      break;
1,149✔
740
    }
741
    case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: {
1,469✔
742
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.tagName);
1,469✔
743

744
      bool isNull = vAlterTbReq.isNull;
1,469✔
745
      if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
1,469✔
746
        STag* jsonTag = (STag*)vAlterTbReq.pTagVal;
×
747
        if (jsonTag->nTag == 0) isNull = true;
×
748
      }
749
      if (!isNull) {
1,469✔
750
        if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
1,469✔
751
          if (!tTagIsJson(vAlterTbReq.pTagVal)) {
×
752
            code = TSDB_CODE_INVALID_PARA;
×
753
            uError("processAlterTable isJson false");
×
754
            goto end;
×
755
          }
756
          parseTagDatatoJson(vAlterTbReq.pTagVal, &buf, NULL);
×
757
          if (buf == NULL) {
×
758
            code = TSDB_CODE_INVALID_PARA;
×
759
            uError("parseTagDatatoJson failed, buf == NULL");
×
760
            goto end;
×
761
          }
762
        } else {
763
          int64_t bufSize = 0;
1,469✔
764
          if (vAlterTbReq.tagType == TSDB_DATA_TYPE_VARBINARY) {
1,469✔
765
            bufSize = vAlterTbReq.nTagVal * 2 + 2 + 3;
×
766
          } else {
767
            bufSize = vAlterTbReq.nTagVal + 32;
1,469✔
768
          }
769
          buf = taosMemoryCalloc(bufSize, 1);
1,469✔
770
          RAW_NULL_CHECK(buf);
1,469✔
771
          code = dataConverToStr(buf, bufSize, vAlterTbReq.tagType, vAlterTbReq.pTagVal, vAlterTbReq.nTagVal, NULL);
1,469✔
772
          if (code != TSDB_CODE_SUCCESS) {
1,469✔
773
            uError("convert tag value to string failed");
×
774
            goto end;
×
775
          }
776
        }
777

778
        ADD_TO_JSON_STRING(json, "colValue", buf);
1,469✔
779
      }
780

781
      ADD_TO_JSON_BOOL(json, "colValueNull", isNull);
1,469✔
782
      break;
1,469✔
783
    }
784
    case TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL: {
×
785
      int32_t nTags = taosArrayGetSize(vAlterTbReq.pMultiTag);
×
786
      if (nTags <= 0) {
×
787
        code = TSDB_CODE_INVALID_PARA;
×
788
        uError("processAlterTable parse multi tags error");
×
789
        goto end;
×
790
      }
791

792
      cJSON* tags = cJSON_AddArrayToObject(json, "tags");
×
793
      RAW_NULL_CHECK(tags);
×
794

795
      for (int32_t i = 0; i < nTags; i++) {
×
796
        cJSON* member = tmqAddObjectToArray(tags);
×
797
        RAW_NULL_CHECK(member);
×
798

799
        SMultiTagUpateVal* pTagVal = taosArrayGet(vAlterTbReq.pMultiTag, i);
×
800
        ADD_TO_JSON_STRING(member, "colName", pTagVal->tagName);
×
801

802
        if (pTagVal->tagType == TSDB_DATA_TYPE_JSON) {
×
803
          code = TSDB_CODE_INVALID_PARA;
×
804
          uError("processAlterTable isJson false");
×
805
          goto end;
×
806
        }
807
        bool isNull = pTagVal->isNull;
×
808
        if (!isNull) {
×
809
          int64_t bufSize = 0;
×
810
          if (pTagVal->tagType == TSDB_DATA_TYPE_VARBINARY) {
×
811
            bufSize = pTagVal->nTagVal * 2 + 2 + 3;
×
812
          } else {
813
            bufSize = pTagVal->nTagVal + 3;
×
814
          }
815
          buf1 = taosMemoryCalloc(bufSize, 1);
×
816
          RAW_NULL_CHECK(buf1);
×
817
          code = dataConverToStr(buf1, bufSize, pTagVal->tagType, pTagVal->pTagVal, pTagVal->nTagVal, NULL);
×
818
          if (code != TSDB_CODE_SUCCESS) {
×
819
            uError("convert tag value to string failed");
×
820
            goto end;
×
821
          }
822
          ADD_TO_JSON_STRING(member, "colValue", buf1)
×
823
          taosMemoryFreeClear(buf1);
×
824
        }
825
        ADD_TO_JSON_BOOL(member, "colValueNull", isNull)
×
826
      }
827
      break;
×
828
    }
829

830
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
×
831
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
×
832
      RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
×
833
      break;
×
834
    }
835
    case TSDB_ALTER_TABLE_ALTER_COLUMN_REF: {
1,068✔
836
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
1,068✔
837
      ADD_TO_JSON_STRING(json, "refDbName", vAlterTbReq.refDbName);
1,068✔
838
      ADD_TO_JSON_STRING(json, "refTbName", vAlterTbReq.refTbName);
1,068✔
839
      ADD_TO_JSON_STRING(json, "refColName", vAlterTbReq.refColName);
1,068✔
840
      break;
1,068✔
841
    }
842
    case TSDB_ALTER_TABLE_REMOVE_COLUMN_REF:{
1,068✔
843
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
1,068✔
844
      break;
1,068✔
845
    }
846
    default:
1,149✔
847
      break;
1,149✔
848
  }
849

850
end:
10,062✔
851
  if (vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL) {
10,062✔
852
    taosArrayDestroy(vAlterTbReq.pMultiTag);
×
853
  }
854
  tDecoderClear(&decoder);
10,062✔
855
  taosMemoryFree(buf);
10,062✔
856
  taosMemoryFree(buf1);
10,062✔
857
  *pJson = json;
10,062✔
858
  RAW_LOG_END
10,062✔
859
  return code;
10,062✔
860
}
861

862
static int32_t processDropSTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
2,246✔
863
  if (pJson == NULL || metaRsp == NULL) {
2,246✔
864
    uError("invalid parameter in %s", __func__);
×
865
    return TSDB_CODE_INVALID_PARA;
×
866
  }
867
  SDecoder     decoder = {0};
2,246✔
868
  SVDropStbReq req = {0};
2,246✔
869
  cJSON*       json = NULL;
2,246✔
870
  int32_t      code = 0;
2,246✔
871
  int32_t      lino = 0;
2,246✔
872
  RAW_LOG_START
2,246✔
873

874
  // decode
875
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
2,246✔
876
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
2,246✔
877
  tDecoderInit(&decoder, data, len);
2,246✔
878
  RAW_RETURN_CHECK(tDecodeSVDropStbReq(&decoder, &req));
2,246✔
879

880
  json = cJSON_CreateObject();
2,246✔
881
  RAW_NULL_CHECK(json);
2,246✔
882
  ADD_TO_JSON_STRING(json, "type", "drop");
2,246✔
883
  ADD_TO_JSON_STRING(json, "tableType", "super");
2,246✔
884
  ADD_TO_JSON_STRING(json, "tableName", req.name);
2,246✔
885

886
end:
2,246✔
887
  tDecoderClear(&decoder);
2,246✔
888
  *pJson = json;
2,246✔
889
  RAW_LOG_END
2,246✔
890
  return code;
2,246✔
891
}
892
static int32_t processDeleteTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
854✔
893
  if (pJson == NULL || metaRsp == NULL) {
854✔
894
    uError("invalid parameter in %s", __func__);
×
895
    return TSDB_CODE_INVALID_PARA;
×
896
  }
897
  SDeleteRes req = {0};
854✔
898
  SDecoder   coder = {0};
854✔
899
  cJSON*     json = NULL;
854✔
900
  int32_t    code = 0;
854✔
901
  int32_t    lino = 0;
854✔
902
  RAW_LOG_START
854✔
903

904
  // decode and process req
905
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
854✔
906
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
854✔
907

908
  tDecoderInit(&coder, data, len);
854✔
909
  RAW_RETURN_CHECK(tDecodeDeleteRes(&coder, &req));
854✔
910
  //  getTbName(req.tableFName);
911
  char sql[256] = {0};
854✔
912
  (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
854✔
913
                 req.tsColName, req.skey, req.tsColName, req.ekey);
914

915
  json = cJSON_CreateObject();
854✔
916
  RAW_NULL_CHECK(json);
854✔
917
  ADD_TO_JSON_STRING(json, "type", "delete");
854✔
918
  ADD_TO_JSON_STRING(json, "sql", sql);
854✔
919

920
end:
854✔
921
  tDecoderClear(&coder);
854✔
922
  *pJson = json;
854✔
923
  RAW_LOG_END
854✔
924
  return code;
854✔
925
}
926

927
static int32_t processDropTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
1,418✔
928
  if (pJson == NULL || metaRsp == NULL) {
1,418✔
929
    uError("invalid parameter in %s", __func__);
×
930
    return TSDB_CODE_INVALID_PARA;
×
931
  }
932
  SDecoder         decoder = {0};
1,418✔
933
  SVDropTbBatchReq req = {0};
1,418✔
934
  cJSON*           json = NULL;
1,418✔
935
  int32_t          code = 0;
1,418✔
936
  int32_t          lino = 0;
1,418✔
937
  RAW_LOG_START
1,418✔
938
  // decode
939
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
1,418✔
940
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
1,418✔
941
  tDecoderInit(&decoder, data, len);
1,418✔
942
  RAW_RETURN_CHECK(tDecodeSVDropTbBatchReq(&decoder, &req));
1,418✔
943

944
  json = cJSON_CreateObject();
1,418✔
945
  RAW_NULL_CHECK(json);
1,418✔
946
  ADD_TO_JSON_STRING(json, "type", "drop");
1,418✔
947

948
  cJSON* tableNameList = cJSON_AddArrayToObject(json, "tableNameList");
1,418✔
949
  RAW_NULL_CHECK(tableNameList);
1,418✔
950

951
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
3,131✔
952
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
1,713✔
953
    RAW_NULL_CHECK(tmqAddStringToArray(tableNameList, pDropTbReq->name));
1,713✔
954
  }
955

956
end:
1,418✔
957
  tDecoderClear(&decoder);
1,418✔
958
  *pJson = json;
1,418✔
959
  RAW_LOG_END
1,418✔
960
  return code;
1,418✔
961
}
962

963
static int32_t taosCreateStb(TAOS* taos, void* meta, uint32_t metaLen) {
34,557✔
964
  if (taos == NULL || meta == NULL) {
34,557✔
965
    uError("invalid parameter in %s", __func__);
×
966
    return TSDB_CODE_INVALID_PARA;
×
967
  }
968
  SVCreateStbReq req = {0};
34,557✔
969
  SDecoder       coder = {0};
34,557✔
970
  SMCreateStbReq pReq = {0};
34,557✔
971
  int32_t        code = TSDB_CODE_SUCCESS;
34,557✔
972
  int32_t        lino = 0;
34,557✔
973
  SRequestObj*   pRequest = NULL;
34,557✔
974
  SCmdMsgInfo    pCmdMsg = {0};
34,557✔
975
  RAW_LOG_START
34,557✔
976

977
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
34,557✔
978
  uDebug(LOG_ID_TAG " create stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
34,557✔
979
  pRequest->syncQuery = true;
34,557✔
980
  if (!pRequest->pDb) {
34,557✔
981
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
982
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
983
    goto end;
×
984
  }
985
  // decode and process req
986
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
34,557✔
987
  uint32_t len = metaLen - sizeof(SMsgHead);
34,557✔
988
  tDecoderInit(&coder, data, len);
34,557✔
989
  RAW_RETURN_CHECK(tDecodeSVCreateStbReq(&coder, &req));
34,557✔
990

991
  int8_t           createDefaultCompress = 0;
34,557✔
992
  SColCmprWrapper* p = &req.colCmpr;
34,557✔
993
  if (p->nCols == 0) {
34,557✔
994
    createDefaultCompress = 1;
×
995
  }
996
  // build create stable
997
  pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SFieldWithOptions));
34,557✔
998
  RAW_NULL_CHECK(pReq.pColumns);
34,557✔
999
  for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
223,680✔
1000
    SSchema*          pSchema = req.schemaRow.pSchema + i;
189,123✔
1001
    SFieldWithOptions field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
189,123✔
1002
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
189,123✔
1003

1004
    if (createDefaultCompress) {
189,123✔
1005
      field.compress = createDefaultColCmprByType(pSchema->type);
×
1006
    } else {
1007
      SColCmpr* pCmp = &req.colCmpr.pColCmpr[i];
189,123✔
1008
      field.compress = pCmp->alg;
189,123✔
1009
    }
1010
    if (req.pExtSchemas) field.typeMod = req.pExtSchemas[i].typeMod;
189,123✔
1011
    RAW_NULL_CHECK(taosArrayPush(pReq.pColumns, &field));
378,246✔
1012
  }
1013
  pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
34,557✔
1014
  RAW_NULL_CHECK(pReq.pTags);
34,557✔
1015
  for (int32_t i = 0; i < req.schemaTag.nCols; i++) {
125,876✔
1016
    SSchema* pSchema = req.schemaTag.pSchema + i;
91,319✔
1017
    SField   field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
91,319✔
1018
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
91,319✔
1019
    RAW_NULL_CHECK(taosArrayPush(pReq.pTags, &field));
182,638✔
1020
  }
1021

1022
  pReq.colVer = req.schemaRow.version;
34,557✔
1023
  pReq.tagVer = req.schemaTag.version;
34,557✔
1024
  pReq.numOfColumns = req.schemaRow.nCols;
34,557✔
1025
  pReq.numOfTags = req.schemaTag.nCols;
34,557✔
1026
  pReq.commentLen = -1;
34,557✔
1027
  pReq.suid = processSuid(req.suid, pRequest->pDb);
34,557✔
1028
  pReq.source = TD_REQ_FROM_TAOX;
34,557✔
1029
  pReq.igExists = true;
34,557✔
1030
  pReq.virtualStb = req.virtualStb;
34,557✔
1031

1032
  uDebug(LOG_ID_TAG " create stable name:%s suid:%" PRId64 " processSuid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
34,557✔
1033
         pReq.suid);
1034
  STscObj* pTscObj = pRequest->pTscObj;
34,557✔
1035
  SName    tableName = {0};
34,557✔
1036
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
34,557✔
1037
  RAW_RETURN_CHECK(tNameExtractFullName(&tableName, pReq.name));
34,557✔
1038
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
34,557✔
1039
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
34,557✔
1040
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
34,557✔
1041
  RAW_FALSE_CHECK(pCmdMsg.msgLen > 0);
34,557✔
1042
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
34,557✔
1043
  RAW_NULL_CHECK(pCmdMsg.pMsg);
34,557✔
1044
  RAW_FALSE_CHECK(tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) > 0);
34,557✔
1045

1046
  SQuery pQuery = {0};
34,557✔
1047
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
34,557✔
1048
  pQuery.pCmdMsg = &pCmdMsg;
34,557✔
1049
  pQuery.msgType = pQuery.pCmdMsg->msgType;
34,557✔
1050
  pQuery.stableQuery = true;
34,557✔
1051

1052
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
34,557✔
1053

1054
  if (pRequest->code == TSDB_CODE_SUCCESS) {
34,557✔
1055
    SCatalog* pCatalog = NULL;
34,557✔
1056
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
34,557✔
1057
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
34,557✔
1058
  }
1059

1060
  code = pRequest->code;
34,557✔
1061
  uDebug(LOG_ID_TAG " create stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
34,557✔
1062

1063
end:
34,557✔
1064
  destroyRequest(pRequest);
34,557✔
1065
  tFreeSMCreateStbReq(&pReq);
34,557✔
1066
  tDecoderClear(&coder);
34,557✔
1067
  taosMemoryFree(pCmdMsg.pMsg);
34,557✔
1068
  RAW_LOG_END
34,557✔
1069
  return code;
34,557✔
1070
}
1071

1072
static int32_t taosDropStb(TAOS* taos, void* meta, uint32_t metaLen) {
2,246✔
1073
  if (taos == NULL || meta == NULL) {
2,246✔
1074
    uError("invalid parameter in %s", __func__);
×
1075
    return TSDB_CODE_INVALID_PARA;
×
1076
  }
1077
  SVDropStbReq req = {0};
2,246✔
1078
  SDecoder     coder = {0};
2,246✔
1079
  SMDropStbReq pReq = {0};
2,246✔
1080
  int32_t      code = TSDB_CODE_SUCCESS;
2,246✔
1081
  int32_t      lino = 0;
2,246✔
1082
  SRequestObj* pRequest = NULL;
2,246✔
1083
  SCmdMsgInfo  pCmdMsg = {0};
2,246✔
1084

1085
  RAW_LOG_START
2,246✔
1086
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
2,246✔
1087
  uDebug(LOG_ID_TAG " drop stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
2,246✔
1088
  pRequest->syncQuery = true;
2,246✔
1089
  if (!pRequest->pDb) {
2,246✔
1090
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
1091
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1092
    goto end;
×
1093
  }
1094
  // decode and process req
1095
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
2,246✔
1096
  uint32_t len = metaLen - sizeof(SMsgHead);
2,246✔
1097
  tDecoderInit(&coder, data, len);
2,246✔
1098
  RAW_RETURN_CHECK(tDecodeSVDropStbReq(&coder, &req));
2,246✔
1099
  SCatalog* pCatalog = NULL;
2,246✔
1100
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
2,246✔
1101
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
2,246✔
1102
                           .requestId = pRequest->requestId,
2,246✔
1103
                           .requestObjRefId = pRequest->self,
2,246✔
1104
                           .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
2,246✔
1105
  SName            pName = {0};
2,246✔
1106
  toName(pRequest->pTscObj->acctId, pRequest->pDb, req.name, &pName);
2,246✔
1107
  STableMeta* pTableMeta = NULL;
2,246✔
1108
  code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
2,246✔
1109
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
2,246✔
1110
    uInfo(LOG_ID_TAG " stable %s not exist, ignore drop", LOG_ID_VALUE, req.name);
552✔
1111
    code = TSDB_CODE_SUCCESS;
552✔
1112
    taosMemoryFreeClear(pTableMeta);
552✔
1113
    goto end;
552✔
1114
  }
1115
  RAW_RETURN_CHECK(code);
1,694✔
1116
  pReq.suid = pTableMeta->uid;
1,694✔
1117
  taosMemoryFreeClear(pTableMeta);
1,694✔
1118

1119
  // build drop stable
1120
  pReq.igNotExists = true;
1,694✔
1121
  pReq.source = TD_REQ_FROM_TAOX;
1,694✔
1122
  //  pReq.suid = processSuid(req.suid, pRequest->pDb);
1123

1124
  uDebug(LOG_ID_TAG " drop stable name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
1,694✔
1125
         pReq.suid);
1126
  STscObj* pTscObj = pRequest->pTscObj;
1,694✔
1127
  SName    tableName = {0};
1,694✔
1128
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
1,694✔
1129
  RAW_RETURN_CHECK(tNameExtractFullName(&tableName, pReq.name));
1,694✔
1130

1131
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
1,694✔
1132
  pCmdMsg.msgType = TDMT_MND_DROP_STB;
1,694✔
1133
  pCmdMsg.msgLen = tSerializeSMDropStbReq(NULL, 0, &pReq);
1,694✔
1134
  RAW_FALSE_CHECK(pCmdMsg.msgLen > 0);
1,694✔
1135
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
1,694✔
1136
  RAW_NULL_CHECK(pCmdMsg.pMsg);
1,694✔
1137
  RAW_FALSE_CHECK(tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) > 0);
1,694✔
1138

1139
  SQuery pQuery = {0};
1,694✔
1140
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
1,694✔
1141
  pQuery.pCmdMsg = &pCmdMsg;
1,694✔
1142
  pQuery.msgType = pQuery.pCmdMsg->msgType;
1,694✔
1143
  pQuery.stableQuery = true;
1,694✔
1144

1145
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
1,694✔
1146
  if (pRequest->code == TSDB_CODE_SUCCESS) {
1,694✔
1147
    // ignore the error code
1148
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
1,694✔
1149
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
1,694✔
1150
  }
1151

1152
  code = pRequest->code;
1,694✔
1153
  uDebug(LOG_ID_TAG " drop stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
1,694✔
1154

1155
end:
2,246✔
1156
  RAW_LOG_END
2,246✔
1157
  destroyRequest(pRequest);
2,246✔
1158
  tDecoderClear(&coder);
2,246✔
1159
  return code;
2,246✔
1160
}
1161

1162
typedef struct SVgroupCreateTableBatch {
1163
  SVCreateTbBatchReq req;
1164
  SVgroupInfo        info;
1165
  char               dbName[TSDB_DB_NAME_LEN];
1166
} SVgroupCreateTableBatch;
1167

1168
static void destroyCreateTbReqBatch(void* data) {
43,545✔
1169
  if (data == NULL) {
43,545✔
1170
    uError("invalid parameter in %s", __func__);
×
1171
    return;
×
1172
  }
1173
  SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data;
43,545✔
1174
  taosArrayDestroy(pTbBatch->req.pArray);
43,545✔
1175
}
1176

1177
static const SSchema* getNormalColSchema(const STableMeta* pTableMeta, const char* pColName) {
6,356✔
1178
  for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) {
18,369✔
1179
    const SSchema* pSchema = pTableMeta->schema + i;
18,369✔
1180
    if (0 == strcmp(pColName, pSchema->name)) {
18,369✔
1181
      return pSchema;
6,356✔
1182
    }
1183
  }
1184
  return NULL;
×
1185
}
1186

1187
static STableMeta* getTableMeta(SCatalog* pCatalog, SRequestConnInfo* conn, char* dbName, char* tbName, int32_t acctId){
6,356✔
1188
  SName       sName = {0};
6,356✔
1189
  toName(acctId, dbName, tbName, &sName);
6,356✔
1190
  STableMeta* pTableMeta = NULL;
6,356✔
1191
  int32_t code = catalogGetTableMeta(pCatalog, conn, &sName, &pTableMeta);
6,356✔
1192
  if (code != 0) {
6,356✔
1193
    uError("failed to get table meta for reference table:%s.%s", dbName, tbName);
×
1194
    taosMemoryFreeClear(pTableMeta);
×
1195
    terrno = code;
×
1196
    return NULL;
×
1197
  }
1198
  return pTableMeta;
6,356✔
1199
}
1200

1201
static int32_t checkColRef(STableMeta* pTableMeta, char* colName, uint8_t precision, const SSchema* pSchema) {
6,000✔
1202
  int32_t code = TSDB_CODE_SUCCESS;
6,000✔
1203
  if (pTableMeta->tableInfo.precision != precision) {
6,000✔
1204
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN_TYPE;
×
1205
    uError("timestamp precision of virtual table and its reference table do not match");
×
1206
    goto end;
×
1207
  }
1208
  // org table cannot has composite primary key
1209
  if (pTableMeta->tableInfo.numOfColumns > 1 && pTableMeta->schema[1].flags & COL_IS_KEY) {
6,000✔
1210
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
1211
    uError("virtual table's column:\"%s\"'s reference can not from table with composite key", colName);
×
1212
    goto end;
×
1213
  }
1214

1215
  // org table must be child table or normal table
1216
  if (pTableMeta->tableType != TSDB_NORMAL_TABLE && pTableMeta->tableType != TSDB_CHILD_TABLE) {
6,000✔
1217
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
1218
    uError("virtual table's column:\"%s\"'s reference can only be normal table or child table", colName);
×
1219
    goto end;
×
1220
  }
1221

1222
  const SSchema* pRefCol = getNormalColSchema(pTableMeta, colName);
6,000✔
1223
  if (NULL == pRefCol) {
6,000✔
1224
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
1225
    uError("virtual table's column:\"%s\"'s reference column:\"%s\" not exist", pSchema->name, colName);
×
1226
    goto end;
×
1227
  }
1228

1229
  if (pRefCol->type != pSchema->type || pRefCol->bytes != pSchema->bytes) {
6,000✔
1230
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN_TYPE;
×
1231
    uError("virtual table's column:\"%s\"'s type and reference column:\"%s\"'s type not match, %d %d %d %d",
×
1232
            pSchema->name, colName, pSchema->type, pSchema->bytes, pRefCol->type, pRefCol->bytes);
1233
    goto end;
×
1234
  }
1235

1236
end:
6,000✔
1237
  return code;
6,000✔
1238
}
1239

1240
static int32_t checkColRefForCreate(SCatalog* pCatalog, SRequestConnInfo* conn, SColRef* pColRef, int32_t acctId, uint8_t precision, SSchema* pSchema) {
5,644✔
1241
  STableMeta* pTableMeta = getTableMeta(pCatalog, conn, pColRef->refDbName, pColRef->refTableName, acctId);
5,644✔
1242
  if (pTableMeta == NULL) {
5,644✔
1243
      return terrno;
×
1244
  }
1245
  int32_t code = checkColRef(pTableMeta, pColRef->refColName, precision, pSchema);
5,644✔
1246
  taosMemoryFreeClear(pTableMeta);
5,644✔
1247
  return code;
5,644✔
1248
}
1249

1250
static int32_t checkColRefForAdd(SCatalog* pCatalog, SRequestConnInfo* conn, int32_t acctId, char* dbName, char* tbName, char* colName, 
×
1251
  char* dbNameSrc, char* tbNameSrc, char* colNameSrc, int8_t type, int32_t bytes) {
1252
  int32_t code = 0;
×
1253
  STableMeta* pTableMeta = getTableMeta(pCatalog, conn, dbName, tbName, acctId);
×
1254
  if (pTableMeta == NULL) {
×
1255
    code = terrno;
×
1256
    goto end;
×
1257
  }
1258
  STableMeta* pTableMetaSrc = getTableMeta(pCatalog, conn, dbNameSrc, tbNameSrc, acctId);
×
1259
  if (pTableMetaSrc == NULL) {
×
1260
    code = terrno;
×
1261
    goto end;
×
1262
  }
1263

1264
  SSchema pSchema = {.type = type, .bytes = bytes};
×
1265
  tstrncpy(pSchema.name, colNameSrc, TSDB_COL_NAME_LEN);
×
1266
  code = checkColRef(pTableMeta, colName, pTableMetaSrc->tableInfo.precision, &pSchema);
×
1267

1268
end:
×
1269
  taosMemoryFreeClear(pTableMeta);
×
1270
  taosMemoryFreeClear(pTableMetaSrc);
×
1271
  return code;
×
1272
}
1273

1274
static int32_t checkColRefForAlter(SCatalog* pCatalog, SRequestConnInfo* conn, int32_t acctId, char* dbName, char* tbName, char* colName, 
356✔
1275
  char* dbNameSrc, char* tbNameSrc, char* colNameSrc) {
1276
  int32_t code = 0;
356✔
1277
  STableMeta* pTableMeta = getTableMeta(pCatalog, conn, dbName, tbName, acctId);
356✔
1278
  if (pTableMeta == NULL) {
356✔
1279
    code = terrno;
×
1280
    goto end;
×
1281
  }
1282
  STableMeta* pTableMetaSrc = getTableMeta(pCatalog, conn, dbNameSrc, tbNameSrc, acctId);
356✔
1283
  if (pTableMetaSrc == NULL) {
356✔
1284
    code = terrno;
×
1285
    goto end;
×
1286
  }
1287
  const SSchema* pSchema = getNormalColSchema(pTableMetaSrc, colNameSrc);
356✔
1288
  if (NULL == pSchema) {
356✔
1289
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
1290
    uError("virtual table's column:\"%s\" not exist", colNameSrc);
×
1291
    goto end;
×
1292
  }
1293

1294
  code = checkColRef(pTableMeta, colName, pTableMetaSrc->tableInfo.precision, pSchema);
356✔
1295

1296
end:
356✔
1297
  taosMemoryFreeClear(pTableMeta);
356✔
1298
  taosMemoryFreeClear(pTableMetaSrc);
356✔
1299
  return code;
356✔
1300
}
1301

1302
static int32_t taosCreateTable(TAOS* taos, void* meta, uint32_t metaLen) {
42,984✔
1303
  if (taos == NULL || meta == NULL) {
42,984✔
1304
    uError("invalid parameter in %s", __func__);
×
1305
    return TSDB_CODE_INVALID_PARA;
×
1306
  }
1307
  SVCreateTbBatchReq req = {0};
42,984✔
1308
  SDecoder           coder = {0};
42,984✔
1309
  int32_t            code = TSDB_CODE_SUCCESS;
42,984✔
1310
  int32_t            lino = 0;
42,984✔
1311
  SRequestObj*       pRequest = NULL;
42,984✔
1312
  SQuery*            pQuery = NULL;
42,984✔
1313
  SHashObj*          pVgroupHashmap = NULL;
42,984✔
1314

1315
  RAW_LOG_START
42,984✔
1316
  SArray* pTagList = taosArrayInit(0, POINTER_BYTES);
42,984✔
1317
  RAW_NULL_CHECK(pTagList);
42,984✔
1318
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
42,984✔
1319
  uDebug(LOG_ID_TAG " create table, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
42,984✔
1320

1321
  pRequest->syncQuery = true;
42,984✔
1322
  if (!pRequest->pDb) {
42,984✔
1323
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
1324
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1325
    goto end;
×
1326
  }
1327
  // decode and process req
1328
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
42,984✔
1329
  uint32_t len = metaLen - sizeof(SMsgHead);
42,984✔
1330
  tDecoderInit(&coder, data, len);
42,984✔
1331
  RAW_RETURN_CHECK(tDecodeSVCreateTbBatchReq(&coder, &req));
42,984✔
1332
  STscObj* pTscObj = pRequest->pTscObj;
42,984✔
1333

1334
  SVCreateTbReq* pCreateReq = NULL;
42,984✔
1335
  SCatalog*      pCatalog = NULL;
42,984✔
1336
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
42,984✔
1337
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
42,984✔
1338
  RAW_NULL_CHECK(pVgroupHashmap);
42,984✔
1339
  taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
42,984✔
1340

1341
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
42,984✔
1342
                           .requestId = pRequest->requestId,
42,984✔
1343
                           .requestObjRefId = pRequest->self,
42,984✔
1344
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
42,984✔
1345

1346
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
42,984✔
1347
  RAW_NULL_CHECK(pRequest->tableList);
42,984✔
1348
  // loop to create table
1349
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
89,739✔
1350
    pCreateReq = req.pReqs + iReq;
46,755✔
1351

1352
    SVgroupInfo pInfo = {0};
46,755✔
1353
    SName       pName = {0};
46,755✔
1354
    toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
46,755✔
1355
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
46,755✔
1356

1357
    pCreateReq->flags |= TD_CREATE_IF_NOT_EXISTS;
46,755✔
1358
    // change tag cid to new cid
1359
    if (pCreateReq->type == TSDB_CHILD_TABLE || pCreateReq->type == TSDB_VIRTUAL_CHILD_TABLE) {
46,755✔
1360
      STableMeta* pTableMeta = NULL;
38,152✔
1361
      SName       sName = {0};
38,152✔
1362
      tb_uid_t    oldSuid = pCreateReq->ctb.suid;
38,152✔
1363
      //      pCreateReq->ctb.suid = processSuid(pCreateReq->ctb.suid, pRequest->pDb);
1364
      toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName);
38,152✔
1365
      code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta);
38,152✔
1366
      if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
38,152✔
1367
        uInfo(LOG_ID_TAG " super table %s not exist, ignore create child table %s", LOG_ID_VALUE,
×
1368
              pCreateReq->ctb.stbName, pCreateReq->name);
1369
        code = TSDB_CODE_SUCCESS;
×
1370
        taosMemoryFreeClear(pTableMeta);
×
1371
        continue;
×
1372
      }
1373

1374
      RAW_RETURN_CHECK(code);
38,152✔
1375
      pCreateReq->ctb.suid = pTableMeta->uid;
38,152✔
1376

1377
      bool changeDB = strlen(tmqWriteRefDB) > 0;
38,152✔
1378
      for (int32_t i = 0; changeDB && i < pCreateReq->colRef.nCols; i++) {
52,288✔
1379
        SColRef* pColRef = pCreateReq->colRef.pColRef + i;
14,136✔
1380
        tstrncpy(pColRef->refDbName, tmqWriteRefDB, TSDB_DB_NAME_LEN);
14,136✔
1381
      }
1382

1383
      for (int32_t i = 0; tmqWriteCheckRef && i < pCreateReq->colRef.nCols && i < pTableMeta->tableInfo.numOfColumns; i++) {
49,440✔
1384
        SColRef* pColRef = pCreateReq->colRef.pColRef + i;
11,288✔
1385
        if (!pColRef || !pColRef->hasRef) continue;
11,288✔
1386
        SSchema* pSchema = pTableMeta->schema + i;
5,644✔
1387
        RAW_RETURN_CHECK(checkColRefForCreate(pCatalog, &conn, pColRef, pTscObj->acctId, pTableMeta->tableInfo.precision, pSchema));
5,644✔
1388
      }
1389
      
1390
      SArray* pTagVals = NULL;
38,152✔
1391
      code = tTagToValArray((STag*)pCreateReq->ctb.pTag, &pTagVals);
38,152✔
1392
      if (code != TSDB_CODE_SUCCESS) {
38,152✔
1393
        uError("create tb invalid tag data %s", pCreateReq->name);
×
1394
        taosMemoryFreeClear(pTableMeta);
×
1395
        goto end;
×
1396
      }
1397

1398
      bool rebuildTag = false;
38,152✔
1399
      for (int32_t i = 0; i < taosArrayGetSize(pCreateReq->ctb.tagName); i++) {
114,820✔
1400
        char* tName = taosArrayGet(pCreateReq->ctb.tagName, i);
76,668✔
1401
        if (tName == NULL) {
76,668✔
1402
          continue;
×
1403
        }
1404
        for (int32_t j = pTableMeta->tableInfo.numOfColumns;
76,668✔
1405
             j < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; j++) {
295,188✔
1406
          SSchema* tag = &pTableMeta->schema[j];
218,520✔
1407
          if (strcmp(tag->name, tName) == 0 && tag->type != TSDB_DATA_TYPE_JSON) {
218,520✔
1408
            STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
72,723✔
1409
            if (pTagVal) {
72,723✔
1410
              if (pTagVal->cid != tag->colId) {
72,723✔
1411
                pTagVal->cid = tag->colId;
5,880✔
1412
                rebuildTag = true;
5,880✔
1413
              }
1414
            } else {
1415
              uError("create tb invalid data %s, size:%d index:%d cid:%d", pCreateReq->name,
×
1416
                     (int)taosArrayGetSize(pTagVals), i, tag->colId);
1417
            }
1418
          }
1419
        }
1420
      }
1421
      taosMemoryFreeClear(pTableMeta);
38,152✔
1422
      if (rebuildTag) {
38,152✔
1423
        STag* ppTag = NULL;
3,660✔
1424
        code = tTagNew(pTagVals, 1, false, &ppTag);
3,660✔
1425
        taosArrayDestroy(pTagVals);
3,660✔
1426
        pTagVals = NULL;
3,660✔
1427
        if (code != TSDB_CODE_SUCCESS) {
3,660✔
1428
          goto end;
×
1429
        }
1430
        if (NULL == taosArrayPush(pTagList, &ppTag)) {
3,660✔
1431
          code = terrno;
×
1432
          tTagFree(ppTag);
×
1433
          goto end;
×
1434
        }
1435
        pCreateReq->ctb.pTag = (uint8_t*)ppTag;
3,660✔
1436
      }
1437
      taosArrayDestroy(pTagVals);
38,152✔
1438
    }
1439
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
93,510✔
1440

1441
    SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
46,755✔
1442
    if (pTableBatch == NULL) {
46,755✔
1443
      SVgroupCreateTableBatch tBatch = {0};
43,545✔
1444
      tBatch.info = pInfo;
43,545✔
1445
      tstrncpy(tBatch.dbName, pRequest->pDb, TSDB_DB_NAME_LEN);
43,545✔
1446

1447
      tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
43,545✔
1448
      RAW_NULL_CHECK(tBatch.req.pArray);
43,545✔
1449
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pCreateReq));
87,090✔
1450
      tBatch.req.source = TD_REQ_FROM_TAOX;
43,545✔
1451
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
43,545✔
1452
    } else {  // add to the correct vgroup
1453
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pCreateReq));
6,420✔
1454
    }
1455
  }
1456

1457
  if (taosHashGetSize(pVgroupHashmap) == 0) {
42,984✔
1458
    goto end;
×
1459
  }
1460
  SArray* pBufArray = NULL;
42,984✔
1461
  RAW_RETURN_CHECK(serializeVgroupsCreateTableBatch(pVgroupHashmap, &pBufArray));
42,984✔
1462
  pQuery = NULL;
42,984✔
1463
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
42,984✔
1464
  if (TSDB_CODE_SUCCESS != code) goto end;
42,984✔
1465
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
42,984✔
1466
  pQuery->msgType = TDMT_VND_CREATE_TABLE;
42,984✔
1467
  pQuery->stableQuery = false;
42,984✔
1468
  code = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT, &pQuery->pRoot);
42,984✔
1469
  if (TSDB_CODE_SUCCESS != code) goto end;
42,984✔
1470
  RAW_NULL_CHECK(pQuery->pRoot);
42,984✔
1471

1472
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
42,984✔
1473

1474
  launchQueryImpl(pRequest, pQuery, true, NULL);
42,984✔
1475
  if (pRequest->code == TSDB_CODE_SUCCESS) {
42,984✔
1476
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
42,984✔
1477
  }
1478

1479
  code = pRequest->code;
42,984✔
1480
  uDebug(LOG_ID_TAG " create table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
42,984✔
1481

1482
end:
42,984✔
1483
  tDeleteSVCreateTbBatchReq(&req);
42,984✔
1484

1485
  taosHashCleanup(pVgroupHashmap);
42,984✔
1486
  destroyRequest(pRequest);
42,984✔
1487
  tDecoderClear(&coder);
42,984✔
1488
  qDestroyQuery(pQuery);
42,984✔
1489
  taosArrayDestroyP(pTagList, NULL);
42,984✔
1490
  RAW_LOG_END
42,984✔
1491
  return code;
42,984✔
1492
}
1493

1494
typedef struct SVgroupDropTableBatch {
1495
  SVDropTbBatchReq req;
1496
  SVgroupInfo      info;
1497
  char             dbName[TSDB_DB_NAME_LEN];
1498
} SVgroupDropTableBatch;
1499

1500
static void destroyDropTbReqBatch(void* data) {
866✔
1501
  if (data == NULL) {
866✔
1502
    uError("invalid parameter in %s", __func__);
×
1503
    return;
×
1504
  }
1505
  SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
866✔
1506
  taosArrayDestroy(pTbBatch->req.pArray);
866✔
1507
}
1508

1509
static int32_t taosDropTable(TAOS* taos, void* meta, uint32_t metaLen) {
1,418✔
1510
  if (taos == NULL || meta == NULL) {
1,418✔
1511
    uError("invalid parameter in %s", __func__);
×
1512
    return TSDB_CODE_INVALID_PARA;
×
1513
  }
1514
  SVDropTbBatchReq req = {0};
1,418✔
1515
  SDecoder         coder = {0};
1,418✔
1516
  int32_t          code = TSDB_CODE_SUCCESS;
1,418✔
1517
  int32_t          lino = 0;
1,418✔
1518
  SRequestObj*     pRequest = NULL;
1,418✔
1519
  SQuery*          pQuery = NULL;
1,418✔
1520
  SHashObj*        pVgroupHashmap = NULL;
1,418✔
1521

1522
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
1,418✔
1523
  uDebug(LOG_ID_TAG " drop table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
1,418✔
1524

1525
  pRequest->syncQuery = true;
1,418✔
1526
  if (!pRequest->pDb) {
1,418✔
1527
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
1528
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1529
    goto end;
×
1530
  }
1531
  // decode and process req
1532
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
1,418✔
1533
  uint32_t len = metaLen - sizeof(SMsgHead);
1,418✔
1534
  tDecoderInit(&coder, data, len);
1,418✔
1535
  RAW_RETURN_CHECK(tDecodeSVDropTbBatchReq(&coder, &req));
1,418✔
1536
  STscObj* pTscObj = pRequest->pTscObj;
1,418✔
1537

1538
  SVDropTbReq* pDropReq = NULL;
1,418✔
1539
  SCatalog*    pCatalog = NULL;
1,418✔
1540
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
1,418✔
1541

1542
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
1,418✔
1543
  RAW_NULL_CHECK(pVgroupHashmap);
1,418✔
1544
  taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
1,418✔
1545

1546
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
1,418✔
1547
                           .requestId = pRequest->requestId,
1,418✔
1548
                           .requestObjRefId = pRequest->self,
1,418✔
1549
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
1,418✔
1550
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
1,418✔
1551
  RAW_NULL_CHECK(pRequest->tableList);
1,418✔
1552
  // loop to create table
1553
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
3,131✔
1554
    pDropReq = req.pReqs + iReq;
1,713✔
1555
    pDropReq->igNotExists = true;
1,713✔
1556
    //    pDropReq->suid = processSuid(pDropReq->suid, pRequest->pDb);
1557

1558
    SVgroupInfo pInfo = {0};
1,713✔
1559
    SName       pName = {0};
1,713✔
1560
    toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
1,713✔
1561
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
1,713✔
1562

1563
    STableMeta* pTableMeta = NULL;
1,713✔
1564
    code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
1,713✔
1565
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
1,713✔
1566
      code = TSDB_CODE_SUCCESS;
552✔
1567
      uInfo(LOG_ID_TAG " table %s not exist, ignore drop", LOG_ID_VALUE, pDropReq->name);
552✔
1568
      taosMemoryFreeClear(pTableMeta);
552✔
1569
      continue;
552✔
1570
    }
1571
    RAW_RETURN_CHECK(code);
1,161✔
1572
    tb_uid_t oldSuid = pDropReq->suid;
1,161✔
1573
    pDropReq->suid = pTableMeta->suid;
1,161✔
1574
    taosMemoryFreeClear(pTableMeta);
1,161✔
1575
    uDebug(LOG_ID_TAG " drop table name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, pDropReq->name, oldSuid,
1,161✔
1576
           pDropReq->suid);
1577

1578
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
2,322✔
1579
    SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
1,161✔
1580
    if (pTableBatch == NULL) {
1,161✔
1581
      SVgroupDropTableBatch tBatch = {0};
866✔
1582
      tBatch.info = pInfo;
866✔
1583
      tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
866✔
1584
      RAW_NULL_CHECK(tBatch.req.pArray);
866✔
1585
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pDropReq));
1,732✔
1586
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
866✔
1587
    } else {  // add to the correct vgroup
1588
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pDropReq));
590✔
1589
    }
1590
  }
1591

1592
  if (taosHashGetSize(pVgroupHashmap) == 0) {
1,418✔
1593
    goto end;
552✔
1594
  }
1595
  SArray* pBufArray = NULL;
866✔
1596
  RAW_RETURN_CHECK(serializeVgroupsDropTableBatch(pVgroupHashmap, &pBufArray));
866✔
1597
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
866✔
1598
  if (TSDB_CODE_SUCCESS != code) goto end;
866✔
1599
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
866✔
1600
  pQuery->msgType = TDMT_VND_DROP_TABLE;
866✔
1601
  pQuery->stableQuery = false;
866✔
1602
  pQuery->pRoot = NULL;
866✔
1603
  code = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT, &pQuery->pRoot);
866✔
1604
  if (TSDB_CODE_SUCCESS != code) goto end;
866✔
1605
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
866✔
1606

1607
  launchQueryImpl(pRequest, pQuery, true, NULL);
866✔
1608
  if (pRequest->code == TSDB_CODE_SUCCESS) {
866✔
1609
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
866✔
1610
  }
1611
  code = pRequest->code;
866✔
1612
  uDebug(LOG_ID_TAG " drop table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
866✔
1613

1614
end:
1,418✔
1615
  taosHashCleanup(pVgroupHashmap);
1,418✔
1616
  destroyRequest(pRequest);
1,418✔
1617
  tDecoderClear(&coder);
1,418✔
1618
  qDestroyQuery(pQuery);
1,418✔
1619
  RAW_LOG_END
1,418✔
1620
  return code;
1,418✔
1621
}
1622

1623
static int32_t taosDeleteData(TAOS* taos, void* meta, uint32_t metaLen) {
854✔
1624
  if (taos == NULL || meta == NULL) {
854✔
1625
    uError("invalid parameter in %s", __func__);
×
1626
    return TSDB_CODE_INVALID_PARA;
×
1627
  }
1628
  SDeleteRes req = {0};
854✔
1629
  SDecoder   coder = {0};
854✔
1630
  char       sql[256] = {0};
854✔
1631
  int32_t    code = TSDB_CODE_SUCCESS;
854✔
1632
  int32_t    lino = 0;
854✔
1633
  uDebug("connId:0x%" PRIx64 " delete data, meta:%p, len:%d", *(int64_t*)taos, meta, metaLen);
854✔
1634

1635
  // decode and process req
1636
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
854✔
1637
  uint32_t len = metaLen - sizeof(SMsgHead);
854✔
1638
  tDecoderInit(&coder, data, len);
854✔
1639
  RAW_RETURN_CHECK(tDecodeDeleteRes(&coder, &req));
854✔
1640
  (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
854✔
1641
                 req.tsColName, req.skey, req.tsColName, req.ekey);
1642

1643
  TAOS_RES* res = taosQueryImpl(taos, sql, false, TD_REQ_FROM_TAOX);
854✔
1644
  RAW_NULL_CHECK(res);
854✔
1645
  SRequestObj* pRequest = (SRequestObj*)res;
854✔
1646
  code = pRequest->code;
854✔
1647
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_PAR_GET_META_ERROR) {
854✔
1648
    code = TSDB_CODE_SUCCESS;
276✔
1649
  }
1650
  taos_free_result(res);
854✔
1651
  uDebug("connId:0x%" PRIx64 " delete data sql:%s, code:%s", *(int64_t*)taos, sql, tstrerror(code));
854✔
1652

1653
end:
854✔
1654
  RAW_LOG_END
854✔
1655
  tDecoderClear(&coder);
854✔
1656
  return code;
854✔
1657
}
1658

1659
static int32_t taosAlterTable(TAOS* taos, void* meta, uint32_t metaLen) {
9,742✔
1660
  if (taos == NULL || meta == NULL) {
9,742✔
1661
    uError("invalid parameter in %s", __func__);
×
1662
    return TSDB_CODE_INVALID_PARA;
×
1663
  }
1664
  SVAlterTbReq   req = {0};
9,742✔
1665
  SDecoder       dcoder = {0};
9,742✔
1666
  int32_t        code = TSDB_CODE_SUCCESS;
9,742✔
1667
  int32_t        lino = 0;
9,742✔
1668
  SRequestObj*   pRequest = NULL;
9,742✔
1669
  SQuery*        pQuery = NULL;
9,742✔
1670
  SArray*        pArray = NULL;
9,742✔
1671
  SVgDataBlocks* pVgData = NULL;
9,742✔
1672
  SEncoder       coder = {0};
9,742✔
1673

1674
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
9,742✔
1675
  uDebug(LOG_ID_TAG " alter table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
9,742✔
1676
  pRequest->syncQuery = true;
9,742✔
1677
  if (!pRequest->pDb) {
9,742✔
1678
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
1679
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1680
    goto end;
×
1681
  }
1682
  // decode and process req
1683
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
9,742✔
1684
  uint32_t len = metaLen - sizeof(SMsgHead);
9,742✔
1685
  tDecoderInit(&dcoder, data, len);
9,742✔
1686
  RAW_RETURN_CHECK(tDecodeSVAlterTbReq(&dcoder, &req));
9,742✔
1687
  // do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
1688
  if (req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS) {
9,742✔
1689
    uInfo(LOG_ID_TAG " alter table action is UPDATE_OPTIONS, ignore", LOG_ID_VALUE);
1,149✔
1690
    goto end;
1,149✔
1691
  }
1692

1693
  STscObj*  pTscObj = pRequest->pTscObj;
8,593✔
1694
  SCatalog* pCatalog = NULL;
8,593✔
1695
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
8,593✔
1696
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
8,593✔
1697
                           .requestId = pRequest->requestId,
8,593✔
1698
                           .requestObjRefId = pRequest->self,
8,593✔
1699
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
8,593✔
1700

1701
  SVgroupInfo pInfo = {0};
8,593✔
1702
  SName       pName = {0};
8,593✔
1703
  toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
8,593✔
1704
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
8,593✔
1705
  pArray = taosArrayInit(1, sizeof(void*));
8,593✔
1706
  RAW_NULL_CHECK(pArray);
8,593✔
1707

1708
  pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
8,593✔
1709
  RAW_NULL_CHECK(pVgData);
8,593✔
1710
  pVgData->vg = pInfo;
8,593✔
1711

1712
  int tlen = 0;
8,593✔
1713
  req.source = TD_REQ_FROM_TAOX;
8,593✔
1714

1715
  if (strlen(tmqWriteRefDB) > 0) {
8,593✔
1716
    req.refDbName = tmqWriteRefDB;
2,848✔
1717
  }
1718

1719
  if (req.action == TSDB_ALTER_TABLE_ALTER_COLUMN_REF && tmqWriteCheckRef) {
8,593✔
1720
    RAW_RETURN_CHECK(checkColRefForAlter(pCatalog, &conn, pTscObj->acctId, req.refDbName, req.refTbName, req.refColName, 
356✔
1721
      pRequest->pDb, req.tbName, req.colName));
1722
  }else if (req.action == TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COLUMN_REF && tmqWriteCheckRef) {
8,237✔
1723
    RAW_RETURN_CHECK(checkColRefForAdd(pCatalog, &conn, pTscObj->acctId, req.refDbName, req.refTbName, req.refColName, 
×
1724
      pRequest->pDb, req.tbName, req.colName, req.type, req.bytes));
1725
  }
1726

1727
  tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code);
8,593✔
1728
  RAW_RETURN_CHECK(code);
8,593✔
1729
  tlen += sizeof(SMsgHead);
8,593✔
1730
  void* pMsg = taosMemoryMalloc(tlen);
8,593✔
1731
  RAW_NULL_CHECK(pMsg);
8,593✔
1732
  ((SMsgHead*)pMsg)->vgId = htonl(pInfo.vgId);
8,593✔
1733
  ((SMsgHead*)pMsg)->contLen = htonl(tlen);
8,593✔
1734
  void* pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
8,593✔
1735
  tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
8,593✔
1736
  code = tEncodeSVAlterTbReq(&coder, &req);
8,593✔
1737
  RAW_RETURN_CHECK(code);
8,593✔
1738

1739
  pVgData->pData = pMsg;
8,593✔
1740
  pVgData->size = tlen;
8,593✔
1741

1742
  pVgData->numOfTables = 1;
8,593✔
1743
  RAW_NULL_CHECK(taosArrayPush(pArray, &pVgData));
8,593✔
1744

1745
  pQuery = NULL;
8,593✔
1746
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
8,593✔
1747
  if (NULL == pQuery) goto end;
8,593✔
1748
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
8,593✔
1749
  pQuery->msgType = TDMT_VND_ALTER_TABLE;
8,593✔
1750
  pQuery->stableQuery = false;
8,593✔
1751
  pQuery->pRoot = NULL;
8,593✔
1752
  code = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT, &pQuery->pRoot);
8,593✔
1753
  if (TSDB_CODE_SUCCESS != code) goto end;
8,593✔
1754
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pArray));
8,593✔
1755

1756
  launchQueryImpl(pRequest, pQuery, true, NULL);
8,593✔
1757

1758
  pVgData = NULL;
8,593✔
1759
  pArray = NULL;
8,593✔
1760
  code = pRequest->code;
8,593✔
1761
  if (code == TSDB_CODE_TDB_TABLE_NOT_EXIST) {
8,593✔
1762
    code = TSDB_CODE_SUCCESS;
276✔
1763
  }
1764

1765
  if (pRequest->code == TSDB_CODE_SUCCESS) {
8,593✔
1766
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
8,317✔
1767
    if (pRes->res != NULL) {
8,317✔
1768
      code = handleAlterTbExecRes(pRes->res, pCatalog);
7,444✔
1769
    }
1770
  }
1771
  uDebug(LOG_ID_TAG " alter table return, meta:%p, len:%d, msg:%s", LOG_ID_VALUE, meta, metaLen, tstrerror(code));
8,593✔
1772

1773
end:
9,742✔
1774
  taosArrayDestroy(pArray);
9,742✔
1775
  if (pVgData) taosMemoryFreeClear(pVgData->pData);
9,742✔
1776
  taosMemoryFreeClear(pVgData);
9,742✔
1777
  destroyRequest(pRequest);
9,742✔
1778
  tDecoderClear(&dcoder);
9,742✔
1779
  qDestroyQuery(pQuery);
9,742✔
1780
  taosArrayDestroy(req.pMultiTag);
9,742✔
1781
  tEncoderClear(&coder);
9,742✔
1782
  RAW_LOG_END
9,742✔
1783
  return code;
9,742✔
1784
}
1785

1786
int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD* fields,
285✔
1787
                                     int numFields) {
1788
  return taos_write_raw_block_with_fields_with_reqid(taos, rows, pData, tbname, fields, numFields, 0);
285✔
1789
}
1790

1791
int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname,
285✔
1792
                                                TAOS_FIELD* fields, int numFields, int64_t reqid) {
1793
  if (taos == NULL || pData == NULL || tbname == NULL) {
285✔
1794
    uError("invalid parameter in %s", __func__);
×
1795
    return TSDB_CODE_INVALID_PARA;
×
1796
  }
1797
  int32_t     code = TSDB_CODE_SUCCESS;
285✔
1798
  int32_t     lino = 0;
285✔
1799
  STableMeta* pTableMeta = NULL;
285✔
1800
  SQuery*     pQuery = NULL;
285✔
1801
  SHashObj*   pVgHash = NULL;
285✔
1802

1803
  SRequestObj* pRequest = NULL;
285✔
1804
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
285✔
1805

1806
  uDebug(LOG_ID_TAG " write raw block with field, rows:%d, pData:%p, tbname:%s, fields:%p, numFields:%d", LOG_ID_VALUE,
285✔
1807
         rows, pData, tbname, fields, numFields);
1808

1809
  pRequest->syncQuery = true;
285✔
1810
  if (!pRequest->pDb) {
285✔
1811
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
1812
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1813
    goto end;
×
1814
  }
1815

1816
  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
285✔
1817
  tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
285✔
1818
  tstrncpy(pName.tname, tbname, sizeof(pName.tname));
285✔
1819

1820
  struct SCatalog* pCatalog = NULL;
285✔
1821
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
285✔
1822

1823
  SRequestConnInfo conn = {0};
285✔
1824
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
285✔
1825
  conn.requestId = pRequest->requestId;
285✔
1826
  conn.requestObjRefId = pRequest->self;
285✔
1827
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
285✔
1828

1829
  SVgroupInfo vgData = {0};
285✔
1830
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData));
285✔
1831
  RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
285✔
1832
  RAW_RETURN_CHECK(smlInitHandle(&pQuery));
285✔
1833
  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
285✔
1834
  RAW_NULL_CHECK(pVgHash);
285✔
1835
  RAW_RETURN_CHECK(
285✔
1836
      taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
1837
  RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0, false));
285✔
1838
  RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
285✔
1839

1840
  launchQueryImpl(pRequest, pQuery, true, NULL);
285✔
1841
  code = pRequest->code;
285✔
1842
  uDebug(LOG_ID_TAG " write raw block with field return, msg:%s", LOG_ID_VALUE, tstrerror(code));
285✔
1843

1844
end:
285✔
1845
  taosMemoryFreeClear(pTableMeta);
285✔
1846
  qDestroyQuery(pQuery);
285✔
1847
  destroyRequest(pRequest);
285✔
1848
  taosHashCleanup(pVgHash);
285✔
1849
  RAW_LOG_END
285✔
1850
  return code;
285✔
1851
}
1852

1853
int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) {
1,995✔
1854
  return taos_write_raw_block_with_reqid(taos, rows, pData, tbname, 0);
1,995✔
1855
}
1856

1857
int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname, int64_t reqid) {
1,995✔
1858
  if (taos == NULL || pData == NULL || tbname == NULL) {
1,995✔
1859
    return TSDB_CODE_INVALID_PARA;
×
1860
  }
1861
  int32_t     code = TSDB_CODE_SUCCESS;
1,995✔
1862
  int32_t     lino = 0;
1,995✔
1863
  STableMeta* pTableMeta = NULL;
1,995✔
1864
  SQuery*     pQuery = NULL;
1,995✔
1865
  SHashObj*   pVgHash = NULL;
1,995✔
1866

1867
  SRequestObj* pRequest = NULL;
1,995✔
1868
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
1,995✔
1869

1870
  uDebug(LOG_ID_TAG " write raw block, rows:%d, pData:%p, tbname:%s", LOG_ID_VALUE, rows, pData, tbname);
1,995✔
1871

1872
  pRequest->syncQuery = true;
1,995✔
1873
  if (!pRequest->pDb) {
1,995✔
1874
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
1875
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1876
    goto end;
×
1877
  }
1878

1879
  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
1,995✔
1880
  tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
1,995✔
1881
  tstrncpy(pName.tname, tbname, sizeof(pName.tname));
1,995✔
1882

1883
  struct SCatalog* pCatalog = NULL;
1,995✔
1884
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
1,995✔
1885

1886
  SRequestConnInfo conn = {0};
1,995✔
1887
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
1,995✔
1888
  conn.requestId = pRequest->requestId;
1,995✔
1889
  conn.requestObjRefId = pRequest->self;
1,995✔
1890
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
1,995✔
1891

1892
  SVgroupInfo vgData = {0};
1,995✔
1893
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData));
1,995✔
1894
  RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
1,995✔
1895
  RAW_RETURN_CHECK(smlInitHandle(&pQuery));
1,710✔
1896
  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
1,710✔
1897
  RAW_NULL_CHECK(pVgHash);
1,710✔
1898
  RAW_RETURN_CHECK(
1,710✔
1899
      taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
1900
  RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false, NULL, 0, false));
1,710✔
1901
  RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
1,140✔
1902

1903
  launchQueryImpl(pRequest, pQuery, true, NULL);
1,140✔
1904
  code = pRequest->code;
1,140✔
1905
  uDebug(LOG_ID_TAG " write raw block return, msg:%s", LOG_ID_VALUE, tstrerror(code));
1,140✔
1906

1907
end:
1,995✔
1908
  taosMemoryFreeClear(pTableMeta);
1,995✔
1909
  qDestroyQuery(pQuery);
1,995✔
1910
  destroyRequest(pRequest);
1,995✔
1911
  taosHashCleanup(pVgHash);
1,995✔
1912
  RAW_LOG_END
1,995✔
1913
  return code;
1,995✔
1914
}
1915

1916
static void* getRawDataFromRes(void* pRetrieve) {
36,538✔
1917
  if (pRetrieve == NULL) {
36,538✔
1918
    uError("invalid parameter in %s", __func__);
×
1919
    return NULL;
×
1920
  }
1921
  void* rawData = NULL;
36,538✔
1922
  // deal with compatibility
1923
  if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_VERSION) {
36,538✔
1924
    rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
×
1925
  } else if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_VERSION ||
36,538✔
1926
             *(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION) {
×
1927
    rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
36,538✔
1928
  }
1929
  return rawData;
36,538✔
1930
}
1931

1932
static int32_t buildCreateTbMap(SMqDataRsp* rsp, SHashObj* pHashObj) {
1,771✔
1933
  if (rsp == NULL || pHashObj == NULL) {
1,771✔
1934
    uError("invalid parameter in %s", __func__);
×
1935
    return TSDB_CODE_INVALID_PARA;
×
1936
  }
1937
  // find schema data info
1938
  int32_t       code = 0;
1,771✔
1939
  int32_t       lino = 0;
1,771✔
1940
  SVCreateTbReq pCreateReq = {0};
1,771✔
1941
  SDecoder      decoderTmp = {0};
1,771✔
1942
  RAW_LOG_START
1,771✔
1943
  for (int j = 0; j < rsp->createTableNum; j++) {
5,899✔
1944
    void** dataTmp = taosArrayGet(rsp->createTableReq, j);
4,128✔
1945
    RAW_NULL_CHECK(dataTmp);
4,128✔
1946
    int32_t* lenTmp = taosArrayGet(rsp->createTableLen, j);
4,128✔
1947
    RAW_NULL_CHECK(lenTmp);
4,128✔
1948

1949
    tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
4,128✔
1950
    RAW_RETURN_CHECK(tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq));
4,128✔
1951

1952
    if (pCreateReq.type != TSDB_CHILD_TABLE) {
4,128✔
1953
      uError("invalid table type %d in %s", pCreateReq.type, __func__);
×
1954
      code = TSDB_CODE_INVALID_MSG;
×
1955
      goto end;
×
1956
    }
1957
    if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL) {
4,128✔
1958
      RAW_RETURN_CHECK(
4,128✔
1959
          taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq)));
1960
    } else {
1961
      tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
×
1962
    }
1963

1964
    tDecoderClear(&decoderTmp);
4,128✔
1965
    pCreateReq = (SVCreateTbReq){0};
4,128✔
1966
  }
1967

1968
end:
1,771✔
1969
  tDecoderClear(&decoderTmp);
1,771✔
1970
  tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
1,771✔
1971
  RAW_LOG_END
1,771✔
1972
  return code;
1,771✔
1973
}
1974

1975
typedef enum {
1976
  WRITE_RAW_INIT_START = 0,
1977
  WRITE_RAW_INIT_OK,
1978
  WRITE_RAW_INIT_FAIL,
1979
} WRITE_RAW_INIT_STATUS;
1980

1981
static SHashObj* writeRawCache = NULL;
1982
static int8_t    initFlag = 0;
1983
static int8_t    initedFlag = WRITE_RAW_INIT_START;
1984

1985
typedef struct {
1986
  SHashObj* pVgHash;
1987
  SHashObj* pNameHash;
1988
  SHashObj* pMetaHash;
1989
} rawCacheInfo;
1990

1991
typedef struct {
1992
  SVgroupInfo vgInfo;
1993
  int64_t     uid;
1994
  int64_t     suid;
1995
} tbInfo;
1996

1997
static void tmqFreeMeta(void* data) {
10,816✔
1998
  if (data == NULL) {
10,816✔
1999
    uError("invalid parameter in %s", __func__);
×
2000
    return;
×
2001
  }
2002
  STableMeta* pTableMeta = *(STableMeta**)data;
10,816✔
2003
  taosMemoryFree(pTableMeta);
10,816✔
2004
}
2005

2006
static void freeRawCache(void* data) {
×
2007
  if (data == NULL) {
×
2008
    uError("invalid parameter in %s", __func__);
×
2009
    return;
×
2010
  }
2011
  rawCacheInfo* pRawCache = (rawCacheInfo*)data;
×
2012
  taosHashCleanup(pRawCache->pMetaHash);
×
2013
  taosHashCleanup(pRawCache->pNameHash);
×
2014
  taosHashCleanup(pRawCache->pVgHash);
×
2015
}
2016

2017
static int32_t initRawCacheHash() {
4,539✔
2018
  if (writeRawCache == NULL) {
4,539✔
2019
    writeRawCache = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
4,539✔
2020
    if (writeRawCache == NULL) {
4,539✔
2021
      return terrno;
×
2022
    }
2023
    taosHashSetFreeFp(writeRawCache, freeRawCache);
4,539✔
2024
  }
2025
  return 0;
4,539✔
2026
}
2027

2028
static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW) {
3,710✔
2029
  if (rawData == NULL || pSW == NULL) {
3,710✔
2030
    return false;
×
2031
  }
2032
  if (pTableMeta == NULL) {
3,710✔
2033
    uError("invalid parameter in %s", __func__);
×
2034
    return false;
×
2035
  }
2036
  char* p = (char*)rawData;
3,710✔
2037
  // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
2038
  // column length |
2039
  p += sizeof(int32_t);
3,710✔
2040
  p += sizeof(int32_t);
3,710✔
2041
  p += sizeof(int32_t);
3,710✔
2042
  p += sizeof(int32_t);
3,710✔
2043
  p += sizeof(int32_t);
3,710✔
2044
  p += sizeof(uint64_t);
3,710✔
2045
  int8_t* fields = p;
3,710✔
2046

2047
  if (pSW->nCols != pTableMeta->tableInfo.numOfColumns) {
3,710✔
2048
    return true;
1,139✔
2049
  }
2050

2051
  for (int i = 0; i < pSW->nCols; i++) {
13,433✔
2052
    int j = 0;
10,862✔
2053
    for (; j < pTableMeta->tableInfo.numOfColumns; j++) {
28,600✔
2054
      SSchema*    pColSchema = &pTableMeta->schema[j];
28,600✔
2055
      SSchemaExt* pColExtSchema = &pTableMeta->schemaExt[j];
28,600✔
2056
      char*       fieldName = pSW->pSchema[i].name;
28,600✔
2057

2058
      if (strcmp(pColSchema->name, fieldName) == 0) {
28,600✔
2059
        if (checkSchema(pColSchema, pColExtSchema, fields, NULL, 0) != 0) {
10,862✔
2060
          return true;
×
2061
        }
2062
        break;
10,862✔
2063
      }
2064
    }
2065
    fields += sizeof(int8_t) + sizeof(int32_t);
10,862✔
2066

2067
    if (j == pTableMeta->tableInfo.numOfColumns) return true;
10,862✔
2068
  }
2069
  return false;
2,571✔
2070
}
2071

2072
static int32_t getRawCache(SHashObj** pVgHash, SHashObj** pNameHash, SHashObj** pMetaHash, void* key) {
12,437✔
2073
  if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || key == NULL) {
12,437✔
2074
    uError("invalid parameter in %s", __func__);
×
2075
    return TSDB_CODE_INVALID_PARA;
×
2076
  }
2077
  int32_t code = 0;
12,437✔
2078
  int32_t lino = 0;
12,437✔
2079
  RAW_LOG_START
12,437✔
2080
  void* cacheInfo = taosHashGet(writeRawCache, &key, POINTER_BYTES);
12,437✔
2081
  if (cacheInfo == NULL) {
12,437✔
2082
    *pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
12,437✔
2083
    RAW_NULL_CHECK(*pVgHash);
12,437✔
2084
    *pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
12,437✔
2085
    RAW_NULL_CHECK(*pNameHash);
12,437✔
2086
    *pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
12,437✔
2087
    RAW_NULL_CHECK(*pMetaHash);
12,437✔
2088
    taosHashSetFreeFp(*pMetaHash, tmqFreeMeta);
12,437✔
2089
    rawCacheInfo info = {*pVgHash, *pNameHash, *pMetaHash};
12,437✔
2090
    RAW_RETURN_CHECK(taosHashPut(writeRawCache, &key, POINTER_BYTES, &info, sizeof(rawCacheInfo)));
12,437✔
2091
  } else {
2092
    rawCacheInfo* info = (rawCacheInfo*)cacheInfo;
×
2093
    *pVgHash = info->pVgHash;
×
2094
    *pNameHash = info->pNameHash;
×
2095
    *pMetaHash = info->pMetaHash;
×
2096
  }
2097

2098
end:
12,437✔
2099
  if (code != 0) {
12,437✔
2100
    taosHashCleanup(*pMetaHash);
×
2101
    taosHashCleanup(*pNameHash);
×
2102
    taosHashCleanup(*pVgHash);
×
2103
  }
2104
  RAW_LOG_END
12,437✔
2105
  return code;
12,437✔
2106
}
2107

2108
static int32_t buildRawRequest(TAOS* taos, SRequestObj** pRequest, SCatalog** pCatalog, SRequestConnInfo* conn) {
12,437✔
2109
  if (taos == NULL || pRequest == NULL || pCatalog == NULL || conn == NULL) {
12,437✔
2110
    uError("invalid parameter in %s", __func__);
×
2111
    return TSDB_CODE_INVALID_PARA;
×
2112
  }
2113
  int32_t code = 0;
12,437✔
2114
  int32_t lino = 0;
12,437✔
2115
  RAW_LOG_START
12,437✔
2116
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, pRequest, 0));
12,437✔
2117
  (*pRequest)->syncQuery = true;
12,437✔
2118
  if (!(*pRequest)->pDb) {
12,437✔
2119
    uError("%s no database selected", __func__);
×
2120
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
2121
    goto end;
×
2122
  }
2123

2124
  RAW_RETURN_CHECK(catalogGetHandle((*pRequest)->pTscObj->pAppInfo->clusterId, pCatalog));
12,437✔
2125
  conn->pTrans = (*pRequest)->pTscObj->pAppInfo->pTransporter;
12,437✔
2126
  conn->requestId = (*pRequest)->requestId;
12,437✔
2127
  conn->requestObjRefId = (*pRequest)->self;
12,437✔
2128
  conn->mgmtEps = getEpSet_s(&(*pRequest)->pTscObj->pAppInfo->mgmtEp);
12,437✔
2129

2130
end:
12,437✔
2131
  RAW_LOG_END
12,437✔
2132
  return code;
12,437✔
2133
}
2134

2135
typedef int32_t _raw_decode_func_(SDecoder* pDecoder, SMqDataRsp* pRsp);
2136
static int32_t  decodeRawData(SDecoder* decoder, void* data, uint32_t dataLen, _raw_decode_func_ func,
12,437✔
2137
                              SMqRspObj* rspObj) {
2138
  if (decoder == NULL || data == NULL || func == NULL || rspObj == NULL) {
12,437✔
2139
    uError("invalid parameter in %s", __func__);
×
2140
    return TSDB_CODE_INVALID_PARA;
×
2141
  }
2142
  int8_t dataVersion = *(int8_t*)data;
12,437✔
2143
  if (dataVersion >= MQ_DATA_RSP_VERSION) {
12,437✔
2144
    data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
12,437✔
2145
    if (dataLen < sizeof(int8_t) + sizeof(int32_t)) {
12,437✔
2146
      return TSDB_CODE_INVALID_PARA;
×
2147
    }
2148
    dataLen -= sizeof(int8_t) + sizeof(int32_t);
12,437✔
2149
  }
2150

2151
  rspObj->resIter = -1;
12,437✔
2152
  tDecoderInit(decoder, data, dataLen);
12,437✔
2153
  int32_t code = func(decoder, &rspObj->dataRsp);
12,437✔
2154
  if (code != 0) {
12,437✔
2155
    SET_ERROR_MSG("decode mq taosx data rsp failed");
×
2156
  }
2157
  return code;
12,437✔
2158
}
2159

2160
static int32_t processCacheMeta(SHashObj* pVgHash, SHashObj* pNameHash, SHashObj* pMetaHash,
36,538✔
2161
                                SVCreateTbReq* pCreateReqDst, SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName,
2162
                                STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry) {
2163
  if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || pCatalog == NULL || conn == NULL || pName == NULL ||
36,538✔
2164
      pMeta == NULL) {
2165
    uError("invalid parameter in %s", __func__);
×
2166
    return TSDB_CODE_INVALID_PARA;
×
2167
  }
2168
  int32_t code = 0;
36,538✔
2169
  int32_t lino = 0;
36,538✔
2170
  RAW_LOG_START
36,538✔
2171
  STableMeta* pTableMeta = NULL;
36,538✔
2172
  tbInfo*     tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
36,538✔
2173
  if (tmpInfo == NULL || retry > 0) {
36,538✔
2174
    tbInfo info = {0};
32,828✔
2175

2176
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, conn, pName, &info.vgInfo));
32,828✔
2177
    if (pCreateReqDst && tmpInfo == NULL) {  // change stable name to get meta
32,828✔
2178
      tstrncpy(pName->tname, pCreateReqDst->ctb.stbName, TSDB_TABLE_NAME_LEN);
4,128✔
2179
    }
2180
    RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
32,828✔
2181
    info.uid = pTableMeta->uid;
32,828✔
2182
    if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
32,828✔
2183
      info.suid = pTableMeta->suid;
23,091✔
2184
    } else {
2185
      info.suid = pTableMeta->uid;
9,737✔
2186
    }
2187
    code = taosHashPut(pMetaHash, &info.suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
32,828✔
2188
    RAW_RETURN_CHECK(code);
32,828✔
2189

2190
    uDebug("put table meta to hash1, suid:%" PRId64 ", metaHashSIze:%d, nameHashSize:%d, vgHashSize:%d", info.suid,
32,828✔
2191
           taosHashGetSize(pMetaHash), taosHashGetSize(pNameHash), taosHashGetSize(pVgHash));
2192
    if (pCreateReqDst) {
32,828✔
2193
      pTableMeta->vgId = info.vgInfo.vgId;
4,128✔
2194
      pTableMeta->uid = pCreateReqDst->uid;
4,128✔
2195
      pCreateReqDst->ctb.suid = pTableMeta->suid;
4,128✔
2196
    }
2197

2198
    RAW_RETURN_CHECK(taosHashPut(pNameHash, pName->tname, strlen(pName->tname), &info, sizeof(tbInfo)));
32,828✔
2199
    tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
32,828✔
2200
    RAW_RETURN_CHECK(
32,828✔
2201
        taosHashPut(pVgHash, &info.vgInfo.vgId, sizeof(info.vgInfo.vgId), &info.vgInfo, sizeof(SVgroupInfo)));
2202
  }
2203

2204
  if (pTableMeta == NULL || retry > 0) {
36,538✔
2205
    STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, &tmpInfo->suid, LONG_BYTES);
3,710✔
2206
    if (pTableMetaTmp == NULL || retry > 0 || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) {
3,710✔
2207
      RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
1,139✔
2208
      code = taosHashPut(pMetaHash, &tmpInfo->suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
1,139✔
2209
      RAW_RETURN_CHECK(code);
1,139✔
2210
      uDebug("put table meta to hash2, suid:%" PRId64 ", metaHashSIze:%d, nameHashSize:%d, vgHashSize:%d",
1,139✔
2211
             tmpInfo->suid, taosHashGetSize(pMetaHash), taosHashGetSize(pNameHash), taosHashGetSize(pVgHash));
2212
    } else {
2213
      pTableMeta = *pTableMetaTmp;
2,571✔
2214
      pTableMeta->uid = tmpInfo->uid;
2,571✔
2215
      pTableMeta->vgId = tmpInfo->vgInfo.vgId;
2,571✔
2216
    }
2217
  }
2218
  *pMeta = pTableMeta;
36,538✔
2219
  pTableMeta = NULL;
36,538✔
2220

2221
end:
36,538✔
2222
  taosMemoryFree(pTableMeta);
36,538✔
2223
  RAW_LOG_END
36,538✔
2224
  return code;
36,538✔
2225
}
2226

2227
static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
10,666✔
2228
  if (taos == NULL || data == NULL) {
10,666✔
2229
    uError("invalid parameter in %s", __func__);
×
2230
    return TSDB_CODE_INVALID_PARA;
×
2231
  }
2232
  int32_t   code = TSDB_CODE_SUCCESS;
10,666✔
2233
  int32_t   lino = 0;
10,666✔
2234
  SQuery*   pQuery = NULL;
10,666✔
2235
  SMqRspObj rspObj = {0};
10,666✔
2236
  SDecoder  decoder = {0};
10,666✔
2237

2238
  SRequestObj*     pRequest = NULL;
10,666✔
2239
  SCatalog*        pCatalog = NULL;
10,666✔
2240
  SRequestConnInfo conn = {0};
10,666✔
2241
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
10,666✔
2242
  uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
10,666✔
2243
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
10,666✔
2244

2245
  SHashObj* pVgHash = NULL;
10,666✔
2246
  SHashObj* pNameHash = NULL;
10,666✔
2247
  SHashObj* pMetaHash = NULL;
10,666✔
2248
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
10,666✔
2249
  int retry = 0;
10,666✔
2250
  while (1) {
2251
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
10,666✔
2252
    uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
10,666✔
2253
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
40,514✔
2254
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
29,848✔
2255
      RAW_NULL_CHECK(tbName);
29,848✔
2256
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
29,848✔
2257
      RAW_NULL_CHECK(pSW);
29,848✔
2258
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
29,848✔
2259
      RAW_NULL_CHECK(pRetrieve);
29,848✔
2260
      void* rawData = getRawDataFromRes(pRetrieve);
29,848✔
2261
      RAW_NULL_CHECK(rawData);
29,848✔
2262

2263
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
29,848✔
2264
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
29,848✔
2265
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
29,848✔
2266
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
29,848✔
2267

2268
      STableMeta* pTableMeta = NULL;
29,848✔
2269
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, &pTableMeta, pSW,
29,848✔
2270
                                        rawData, retry));
2271
      char err[ERR_MSG_LEN] = {0};
29,848✔
2272
      code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
29,848✔
2273
      if (code != TSDB_CODE_SUCCESS) {
29,848✔
2274
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
2275
        goto end;
×
2276
      }
2277
    }
2278
    RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
10,666✔
2279
    launchQueryImpl(pRequest, pQuery, true, NULL);
10,666✔
2280
    code = pRequest->code;
10,666✔
2281

2282
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
10,666✔
2283
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
2284
      qDestroyQuery(pQuery);
×
2285
      pQuery = NULL;
×
2286
      rspObj.resIter = -1;
×
2287
      continue;
×
2288
    }
2289
    break;
10,666✔
2290
  }
2291
  uDebug(LOG_ID_TAG " write raw data return, msg:%s", LOG_ID_VALUE, tstrerror(code));
10,666✔
2292

2293
end:
10,666✔
2294
  tDeleteMqDataRsp(&rspObj.dataRsp);
10,666✔
2295
  tDecoderClear(&decoder);
10,666✔
2296
  qDestroyQuery(pQuery);
10,666✔
2297
  destroyRequest(pRequest);
10,666✔
2298
  RAW_LOG_END
10,666✔
2299
  return code;
10,666✔
2300
}
2301

2302
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
1,771✔
2303
  if (taos == NULL || data == NULL) {
1,771✔
2304
    uError("invalid parameter in %s", __func__);
×
2305
    return TSDB_CODE_INVALID_PARA;
×
2306
  }
2307
  int32_t   code = TSDB_CODE_SUCCESS;
1,771✔
2308
  int32_t   lino = 0;
1,771✔
2309
  SQuery*   pQuery = NULL;
1,771✔
2310
  SMqRspObj rspObj = {0};
1,771✔
2311
  SDecoder  decoder = {0};
1,771✔
2312
  SHashObj* pCreateTbHash = NULL;
1,771✔
2313

2314
  SRequestObj*     pRequest = NULL;
1,771✔
2315
  SCatalog*        pCatalog = NULL;
1,771✔
2316
  SRequestConnInfo conn = {0};
1,771✔
2317

2318
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
1,771✔
2319
  uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
1,771✔
2320
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeSTaosxRsp, &rspObj));
1,771✔
2321

2322
  pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
1,771✔
2323
  RAW_NULL_CHECK(pCreateTbHash);
1,771✔
2324
  RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash));
1,771✔
2325

2326
  SHashObj* pVgHash = NULL;
1,771✔
2327
  SHashObj* pNameHash = NULL;
1,771✔
2328
  SHashObj* pMetaHash = NULL;
1,771✔
2329
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
1,771✔
2330
  int retry = 0;
1,771✔
2331
  while (1) {
2332
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
1,771✔
2333
    uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
1,771✔
2334
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
8,461✔
2335
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
6,690✔
2336
      RAW_NULL_CHECK(tbName);
6,690✔
2337
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
6,690✔
2338
      RAW_NULL_CHECK(pSW);
6,690✔
2339
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
6,690✔
2340
      RAW_NULL_CHECK(pRetrieve);
6,690✔
2341
      void* rawData = getRawDataFromRes(pRetrieve);
6,690✔
2342
      RAW_NULL_CHECK(rawData);
6,690✔
2343

2344
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
6,690✔
2345
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
6,690✔
2346
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
6,690✔
2347
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
6,690✔
2348

2349
      // find schema data info
2350
      SVCreateTbReq* pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, pName.tname, strlen(pName.tname));
6,690✔
2351
      STableMeta*    pTableMeta = NULL;
6,690✔
2352
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, pCreateReqDst, pCatalog, &conn, &pName,
6,690✔
2353
                                        &pTableMeta, pSW, rawData, retry));
2354
      char err[ERR_MSG_LEN] = {0};
6,690✔
2355
      code =
2356
          rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
6,690✔
2357
      if (code != TSDB_CODE_SUCCESS) {
6,690✔
2358
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
2359
        goto end;
×
2360
      }
2361
    }
2362
    RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
1,771✔
2363
    launchQueryImpl(pRequest, pQuery, true, NULL);
1,771✔
2364
    code = pRequest->code;
1,771✔
2365

2366
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
1,771✔
2367
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
2368
      qDestroyQuery(pQuery);
×
2369
      pQuery = NULL;
×
2370
      rspObj.resIter = -1;
×
2371
      continue;
×
2372
    }
2373
    break;
1,771✔
2374
  }
2375
  uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
1,771✔
2376

2377
end:
1,771✔
2378
  tDeleteSTaosxRsp(&rspObj.dataRsp);
1,771✔
2379
  void* pIter = taosHashIterate(pCreateTbHash, NULL);
1,771✔
2380
  while (pIter) {
5,899✔
2381
    tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE);
4,128✔
2382
    pIter = taosHashIterate(pCreateTbHash, pIter);
4,128✔
2383
  }
2384
  taosHashCleanup(pCreateTbHash);
1,771✔
2385
  tDecoderClear(&decoder);
1,771✔
2386
  qDestroyQuery(pQuery);
1,771✔
2387
  destroyRequest(pRequest);
1,771✔
2388
  RAW_LOG_END
1,771✔
2389
  return code;
1,771✔
2390
}
2391

2392
static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
×
2393
  if (taos == NULL || data == NULL) {
×
2394
    uError("invalid parameter in %s", __func__);
×
2395
    return TSDB_CODE_INVALID_PARA;
×
2396
  }
2397
  int32_t   code = TSDB_CODE_SUCCESS;
×
2398
  int32_t   lino = 0;
×
2399
  SQuery*   pQuery = NULL;
×
2400
  SHashObj* pVgroupHash = NULL;
×
2401
  SMqRspObj rspObj = {0};
×
2402
  SDecoder  decoder = {0};
×
2403

2404
  SRequestObj*     pRequest = NULL;
×
2405
  SCatalog*        pCatalog = NULL;
×
2406
  SRequestConnInfo conn = {0};
×
2407

2408
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
×
2409
  uDebug(LOG_ID_TAG " write raw rawdata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
×
2410
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
×
2411

2412
  SHashObj* pVgHash = NULL;
×
2413
  SHashObj* pNameHash = NULL;
×
2414
  SHashObj* pMetaHash = NULL;
×
2415
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
×
2416
  int retry = 0;
×
2417
  while (1) {
×
2418
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
×
2419
    uDebug(LOG_ID_TAG " write raw rawdata block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
×
2420
    SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(pQuery)->pRoot;
×
2421
    pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
×
2422
    RAW_NULL_CHECK(pVgroupHash);
×
2423
    pStmt->pVgDataBlocks = taosArrayInit(8, POINTER_BYTES);
×
2424
    RAW_NULL_CHECK(pStmt->pVgDataBlocks);
×
2425

2426
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
×
2427
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
×
2428
      RAW_NULL_CHECK(tbName);
×
2429
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
×
2430
      RAW_NULL_CHECK(pRetrieve);
×
2431
      void* rawData = getRawDataFromRes(pRetrieve);
×
2432
      RAW_NULL_CHECK(rawData);
×
2433

2434
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
×
2435
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
×
2436
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
×
2437
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
×
2438

2439
      // find schema data info
2440
      STableMeta* pTableMeta = NULL;
×
2441
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, &pTableMeta, NULL,
×
2442
                                        NULL, retry));
2443
      char err[ERR_MSG_LEN] = {0};
×
2444
      code = rawBlockBindRawData(pVgroupHash, pStmt->pVgDataBlocks, pTableMeta, rawData);
×
2445
      if (code != TSDB_CODE_SUCCESS) {
×
2446
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
2447
        goto end;
×
2448
      }
2449
    }
2450
    taosHashCleanup(pVgroupHash);
×
2451
    pVgroupHash = NULL;
×
2452

2453
    RAW_RETURN_CHECK(smlBuildOutputRaw(pQuery, pVgHash));
×
2454
    launchQueryImpl(pRequest, pQuery, true, NULL);
×
2455
    code = pRequest->code;
×
2456

2457
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
×
2458
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
2459
      qDestroyQuery(pQuery);
×
2460
      pQuery = NULL;
×
2461
      rspObj.resIter = -1;
×
2462
      continue;
×
2463
    }
2464
    break;
×
2465
  }
2466
  uDebug(LOG_ID_TAG " write raw rawdata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
×
2467

2468
end:
×
2469
  tDeleteMqDataRsp(&rspObj.dataRsp);
×
2470
  tDecoderClear(&decoder);
×
2471
  qDestroyQuery(pQuery);
×
2472
  taosHashCleanup(pVgroupHash);
×
2473
  destroyRequest(pRequest);
×
2474
  RAW_LOG_END
×
2475
  return code;
×
2476
}
2477

2478
static int32_t processSimpleMeta(SMqMetaRsp* pMetaRsp, cJSON** meta) {
97,155✔
2479
  if (pMetaRsp == NULL || meta == NULL) {
97,155✔
2480
    uError("invalid parameter in %s", __func__);
×
2481
    return TSDB_CODE_INVALID_PARA;
×
2482
  }
2483
  int32_t code = 0;
97,155✔
2484
  int32_t lino = 0;
97,155✔
2485
  RAW_LOG_START
97,155✔
2486
  if (pMetaRsp->resMsgType == TDMT_VND_CREATE_STB) {
97,155✔
2487
    RAW_RETURN_CHECK(processCreateStb(pMetaRsp, meta));
24,513✔
2488
  } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_STB) {
72,642✔
2489
    RAW_RETURN_CHECK(processAlterStb(pMetaRsp, meta));
13,161✔
2490
  } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_STB) {
59,481✔
2491
    RAW_RETURN_CHECK(processDropSTable(pMetaRsp, meta));
2,246✔
2492
  } else if (pMetaRsp->resMsgType == TDMT_VND_CREATE_TABLE) {
57,235✔
2493
    RAW_RETURN_CHECK(processCreateTable(pMetaRsp, meta));
44,901✔
2494
  } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_TABLE) {
12,334✔
2495
    RAW_RETURN_CHECK(processAlterTable(pMetaRsp, meta));
10,062✔
2496
  } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_TABLE) {
2,272✔
2497
    RAW_RETURN_CHECK(processDropTable(pMetaRsp, meta));
1,418✔
2498
  } else if (pMetaRsp->resMsgType == TDMT_VND_DELETE) {
854✔
2499
    RAW_RETURN_CHECK(processDeleteTable(pMetaRsp, meta));
854✔
2500
  }
2501

2502
end:
854✔
2503
  RAW_LOG_END
97,155✔
2504
  return code;
97,155✔
2505
}
2506

2507
static int32_t processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp, char** string) {
5,292✔
2508
  if (pMsgRsp == NULL || string == NULL) {
5,292✔
2509
    uError("invalid parameter in %s", __func__);
×
2510
    return TSDB_CODE_INVALID_PARA;
×
2511
  }
2512
  SDecoder        coder = {0};
5,292✔
2513
  SMqBatchMetaRsp rsp = {0};
5,292✔
2514
  int32_t         code = 0;
5,292✔
2515
  int32_t         lino = 0;
5,292✔
2516
  cJSON*          pJson = NULL;
5,292✔
2517
  tDecoderInit(&coder, pMsgRsp->pMetaBuff, pMsgRsp->metaBuffLen);
5,292✔
2518
  RAW_RETURN_CHECK(tDecodeMqBatchMetaRsp(&coder, &rsp));
5,292✔
2519

2520
  pJson = cJSON_CreateObject();
5,292✔
2521
  RAW_NULL_CHECK(pJson);
5,292✔
2522
  RAW_FALSE_CHECK(cJSON_AddStringToObject(pJson, "tmq_meta_version", TMQ_META_VERSION));
5,292✔
2523
  cJSON* pMetaArr = cJSON_AddArrayToObject(pJson, "metas");
5,292✔
2524
  RAW_NULL_CHECK(pMetaArr);
5,292✔
2525

2526
  int32_t num = taosArrayGetSize(rsp.batchMetaReq);
5,292✔
2527
  for (int32_t i = 0; i < num; i++) {
47,356✔
2528
    int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
42,064✔
2529
    RAW_NULL_CHECK(len);
42,064✔
2530
    void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
42,064✔
2531
    RAW_NULL_CHECK(tmpBuf);
42,064✔
2532
    SDecoder   metaCoder = {0};
42,064✔
2533
    SMqMetaRsp metaRsp = {0};
42,064✔
2534
    tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead));
42,064✔
2535
    RAW_RETURN_CHECK(tDecodeMqMetaRsp(&metaCoder, &metaRsp));
42,064✔
2536
    cJSON* pItem = NULL;
42,064✔
2537
    RAW_RETURN_CHECK(processSimpleMeta(&metaRsp, &pItem));
42,064✔
2538
    tDeleteMqMetaRsp(&metaRsp);
42,064✔
2539
    if (pItem != NULL) RAW_FALSE_CHECK(cJSON_AddItemToArray(pMetaArr, pItem));
42,064✔
2540
  }
2541

2542
  char* fullStr = cJSON_PrintUnformatted(pJson);
5,292✔
2543
  *string = fullStr;
5,292✔
2544

2545
end:
5,292✔
2546
  cJSON_Delete(pJson);
5,292✔
2547
  tDeleteMqBatchMetaRsp(&rsp);
5,292✔
2548
  RAW_LOG_END
5,292✔
2549
  return code;
5,292✔
2550
}
2551

2552
char* tmq_get_json_meta(TAOS_RES* res) {
63,146✔
2553
  int32_t code = TSDB_CODE_SUCCESS;
63,146✔
2554
  int32_t lino = 0;
63,146✔
2555
  char*   string = NULL;
63,146✔
2556
  RAW_LOG_START
63,146✔
2557
  RAW_NULL_CHECK(res);
63,146✔
2558
  RAW_FALSE_CHECK(TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res));
63,146✔
2559

2560
  SMqRspObj* rspObj = (SMqRspObj*)res;
63,146✔
2561
  if (TD_RES_TMQ_METADATA(res)) {
63,146✔
2562
    RAW_RETURN_CHECK(processAutoCreateTable(&rspObj->dataRsp, &string));
2,763✔
2563
  } else if (TD_RES_TMQ_BATCH_META(res)) {
60,383✔
2564
    RAW_RETURN_CHECK(processBatchMetaToJson(&rspObj->batchMetaRsp, &string));
5,292✔
2565
  } else if (TD_RES_TMQ_META(res)) {
55,091✔
2566
    cJSON* pJson = NULL;
55,091✔
2567
    RAW_RETURN_CHECK(processSimpleMeta(&rspObj->metaRsp, &pJson));
55,091✔
2568
    string = cJSON_PrintUnformatted(pJson);
55,091✔
2569
    cJSON_Delete(pJson);
55,091✔
2570
  } else {
2571
    uError("tmq_get_json_meta res:%d, invalid type", *(int8_t*)res);
×
2572
  }
2573

2574
  uDebug("tmq_get_json_meta string:%s", string);
63,146✔
2575

2576
end:
63,146✔
2577
  RAW_LOG_END
63,146✔
2578
  return string;
63,146✔
2579
}
2580

2581
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
74,661✔
2582

2583
static int32_t getOffSetLen(const SMqDataRsp* pRsp) {
12,473✔
2584
  if (pRsp == NULL) {
12,473✔
2585
    uError("invalid parameter in %s", __func__);
×
2586
    return TSDB_CODE_INVALID_PARA;
×
2587
  }
2588
  int32_t pos = 0;
12,473✔
2589
  int32_t code = 0;
12,473✔
2590
  int32_t lino = 0;
12,473✔
2591
  RAW_LOG_START
12,473✔
2592
  SEncoder coder = {0};
12,473✔
2593
  tEncoderInit(&coder, NULL, 0);
12,473✔
2594
  RAW_RETURN_CHECK(tEncodeSTqOffsetVal(&coder, &pRsp->reqOffset));
12,473✔
2595
  RAW_RETURN_CHECK(tEncodeSTqOffsetVal(&coder, &pRsp->rspOffset));
12,473✔
2596
  pos = coder.pos;
12,473✔
2597
  tEncoderClear(&coder);
12,473✔
2598

2599
end:
12,473✔
2600
  if (code != 0) {
12,473✔
2601
    uError("getOffSetLen failed, code:%d", code);
×
2602
    return code;
×
2603
  } else {
2604
    uDebug("getOffSetLen success, len:%d", pos);
12,473✔
2605
    return pos;
12,473✔
2606
  }
2607
}
2608

2609
typedef int32_t __encode_func__(SEncoder* pEncoder, const SMqDataRsp* pRsp);
2610
static int32_t  encodeMqDataRsp(__encode_func__* encodeFunc, SMqDataRsp* rspObj, tmq_raw_data* raw) {
12,473✔
2611
  if (raw == NULL || encodeFunc == NULL || rspObj == NULL) {
12,473✔
2612
    uError("invalid parameter in %s", __func__);
×
2613
    return TSDB_CODE_INVALID_PARA;
×
2614
  }
2615
  uint32_t len = 0;
12,473✔
2616
  int32_t  code = 0;
12,473✔
2617
  int32_t  lino = 0;
12,473✔
2618
  SEncoder encoder = {0};
12,473✔
2619
  void*    buf = NULL;
12,473✔
2620
  tEncodeSize(encodeFunc, rspObj, len, code);
12,473✔
2621
  RAW_FALSE_CHECK(code >= 0);
12,473✔
2622
  len += sizeof(int8_t) + sizeof(int32_t);
12,473✔
2623
  buf = taosMemoryCalloc(1, len);
12,473✔
2624
  RAW_NULL_CHECK(buf);
12,473✔
2625
  tEncoderInit(&encoder, buf, len);
12,473✔
2626
  RAW_RETURN_CHECK(tEncodeI8(&encoder, MQ_DATA_RSP_VERSION));
12,473✔
2627
  int32_t offsetLen = getOffSetLen(rspObj);
12,473✔
2628
  RAW_FALSE_CHECK(offsetLen > 0);
12,473✔
2629
  RAW_RETURN_CHECK(tEncodeI32(&encoder, offsetLen));
12,473✔
2630
  RAW_RETURN_CHECK(encodeFunc(&encoder, rspObj));
12,473✔
2631

2632
  raw->raw = buf;
12,473✔
2633
  buf = NULL;
12,473✔
2634
  raw->raw_len = len;
12,473✔
2635

2636
end:
12,473✔
2637
  RAW_LOG_END
12,473✔
2638
  return code;
12,473✔
2639
}
2640

2641
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
74,478✔
2642
  if (raw == NULL || res == NULL) {
74,478✔
2643
    uError("invalid parameter in %s", __func__);
×
2644
    return TSDB_CODE_INVALID_PARA;
×
2645
  }
2646
  int32_t code = TSDB_CODE_SUCCESS;
74,478✔
2647
  int32_t lino = 0;
74,478✔
2648
  RAW_LOG_START
74,478✔
2649
  *raw = (tmq_raw_data){0};
74,478✔
2650
  SMqRspObj* rspObj = ((SMqRspObj*)res);
74,478✔
2651
  if (TD_RES_TMQ_META(res)) {
74,478✔
2652
    raw->raw = rspObj->metaRsp.metaRsp;
57,616✔
2653
    raw->raw_len = rspObj->metaRsp.metaRspLen >= 0 ? rspObj->metaRsp.metaRspLen : 0;
57,616✔
2654
    raw->raw_type = rspObj->metaRsp.resMsgType;
57,616✔
2655
    uDebug("tmq get raw type meta:%p", raw);
57,616✔
2656
  } else if (TD_RES_TMQ(res)) {
16,862✔
2657
    RAW_RETURN_CHECK(encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->dataRsp, raw));
10,702✔
2658
    raw->raw_type = RES_TYPE__TMQ;
10,702✔
2659
    uDebug("tmq get raw type data:%p", raw);
10,702✔
2660
  } else if (TD_RES_TMQ_METADATA(res)) {
6,160✔
2661
    RAW_RETURN_CHECK(encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->dataRsp, raw));
1,771✔
2662
    raw->raw_type = RES_TYPE__TMQ_METADATA;
1,771✔
2663
    uDebug("tmq get raw type metadata:%p", raw);
1,771✔
2664
  } else if (TD_RES_TMQ_BATCH_META(res)) {
4,389✔
2665
    raw->raw = rspObj->batchMetaRsp.pMetaBuff;
4,389✔
2666
    raw->raw_len = rspObj->batchMetaRsp.metaBuffLen;
4,389✔
2667
    raw->raw_type = rspObj->resType;
4,389✔
2668
    uDebug("tmq get raw batch meta:%p", raw);
4,389✔
2669
  } else if (TD_RES_TMQ_RAW(res)) {
×
2670
    raw->raw = rspObj->dataRsp.rawData;
×
2671
    rspObj->dataRsp.rawData = NULL;
×
2672
    raw->raw_len = rspObj->dataRsp.len;
×
2673
    raw->raw_type = rspObj->resType;
×
2674
    uDebug("tmq get raw raw:%p", raw);
×
2675
  } else {
2676
    uError("tmq get raw error type:%d", *(int8_t*)res);
×
2677
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
2678
  }
2679

2680
end:
74,478✔
2681
  RAW_LOG_END
74,478✔
2682
  return code;
74,478✔
2683
}
2684

2685
void tmq_free_raw(tmq_raw_data raw) {
74,442✔
2686
  uDebug("tmq free raw data type:%d", raw.raw_type);
74,442✔
2687
  if (raw.raw_type == RES_TYPE__TMQ || raw.raw_type == RES_TYPE__TMQ_METADATA) {
74,442✔
2688
    taosMemoryFree(raw.raw);
12,437✔
2689
  } else if (raw.raw_type == RES_TYPE__TMQ_RAWDATA && raw.raw != NULL) {
62,005✔
2690
    taosMemoryFree(POINTER_SHIFT(raw.raw, -sizeof(SMqRspHead)));
×
2691
  }
2692
  (void)memset(terrMsg, 0, ERR_MSG_LEN);
74,442✔
2693
}
74,442✔
2694

2695
static int32_t writeRawInit() {
108,627✔
2696
  while (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_START) {
113,166✔
2697
    int8_t old = atomic_val_compare_exchange_8(&initFlag, 0, 1);
4,539✔
2698
    if (old == 0) {
4,539✔
2699
      int32_t code = initRawCacheHash();
4,539✔
2700
      if (code != 0) {
4,539✔
2701
        uError("tmq writeRawImpl init error:%d", code);
×
2702
        atomic_store_8(&initedFlag, WRITE_RAW_INIT_FAIL);
×
2703
        return code;
×
2704
      }
2705
      atomic_store_8(&initedFlag, WRITE_RAW_INIT_OK);
4,539✔
2706
    }
2707
  }
2708

2709
  if (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_FAIL) {
108,627✔
2710
    return TSDB_CODE_INTERNAL_ERROR;
×
2711
  }
2712
  return 0;
108,627✔
2713
}
2714

2715
static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) {
108,627✔
2716
  if (taos == NULL || buf == NULL) {
108,627✔
2717
    uError("invalid parameter in %s", __func__);
×
2718
    return TSDB_CODE_INVALID_PARA;
×
2719
  }
2720
  if (writeRawInit() != 0) {
108,627✔
2721
    return TSDB_CODE_INTERNAL_ERROR;
×
2722
  }
2723

2724
  if (type == TDMT_VND_CREATE_STB) {
108,627✔
2725
    return taosCreateStb(taos, buf, len);
23,202✔
2726
  } else if (type == TDMT_VND_ALTER_STB) {
85,425✔
2727
    return taosCreateStb(taos, buf, len);
11,355✔
2728
  } else if (type == TDMT_VND_DROP_STB) {
74,070✔
2729
    return taosDropStb(taos, buf, len);
2,246✔
2730
  } else if (type == TDMT_VND_CREATE_TABLE) {
71,824✔
2731
    return taosCreateTable(taos, buf, len);
42,984✔
2732
  } else if (type == TDMT_VND_ALTER_TABLE) {
28,840✔
2733
    return taosAlterTable(taos, buf, len);
9,742✔
2734
  } else if (type == TDMT_VND_DROP_TABLE) {
19,098✔
2735
    return taosDropTable(taos, buf, len);
1,418✔
2736
  } else if (type == TDMT_VND_DELETE) {
17,680✔
2737
    return taosDeleteData(taos, buf, len);
854✔
2738
  } else if (type == RES_TYPE__TMQ_METADATA) {
16,826✔
2739
    return tmqWriteRawMetaDataImpl(taos, buf, len);
1,771✔
2740
  } else if (type == RES_TYPE__TMQ_RAWDATA) {
15,055✔
2741
    return tmqWriteRawRawDataImpl(taos, buf, len);
×
2742
  } else if (type == RES_TYPE__TMQ) {
15,055✔
2743
    return tmqWriteRawDataImpl(taos, buf, len);
10,666✔
2744
  } else if (type == RES_TYPE__TMQ_BATCH_META) {
4,389✔
2745
    return tmqWriteBatchMetaDataImpl(taos, buf, len);
4,389✔
2746
  }
2747
  return TSDB_CODE_INVALID_PARA;
×
2748
}
2749

2750
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
71,109✔
2751
  if (taos == NULL || raw.raw == NULL || raw.raw_len <= 0) {
71,109✔
2752
    SET_ERROR_MSG("taos:%p or data:%p is NULL or raw_len <= 0", taos, raw.raw);
3,643✔
2753
    return TSDB_CODE_INVALID_PARA;
3,643✔
2754
  }
2755
  taosClearErrMsg();  // clear global error message
67,466✔
2756

2757
  return writeRawImpl(taos, raw.raw, raw.raw_len, raw.raw_type);
67,466✔
2758
}
2759

2760
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen) {
4,389✔
2761
  if (taos == NULL || meta == NULL) {
4,389✔
2762
    uError("invalid parameter in %s", __func__);
×
2763
    return TSDB_CODE_INVALID_PARA;
×
2764
  }
2765
  SMqBatchMetaRsp rsp = {0};
4,389✔
2766
  SDecoder        coder = {0};
4,389✔
2767
  int32_t         code = TSDB_CODE_SUCCESS;
4,389✔
2768
  int32_t         lino = 0;
4,389✔
2769

2770
  RAW_LOG_START
4,389✔
2771
  // decode and process req
2772
  tDecoderInit(&coder, meta, metaLen);
4,389✔
2773
  RAW_RETURN_CHECK(tDecodeMqBatchMetaRsp(&coder, &rsp));
4,389✔
2774
  int32_t num = taosArrayGetSize(rsp.batchMetaReq);
4,389✔
2775
  for (int32_t i = 0; i < num; i++) {
45,550✔
2776
    int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
41,161✔
2777
    RAW_NULL_CHECK(len);
41,161✔
2778
    void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
41,161✔
2779
    RAW_NULL_CHECK(tmpBuf);
41,161✔
2780
    SDecoder   metaCoder = {0};
41,161✔
2781
    SMqMetaRsp metaRsp = {0};
41,161✔
2782
    tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead));
41,161✔
2783
    RAW_RETURN_CHECK(tDecodeMqMetaRsp(&metaCoder, &metaRsp));
41,161✔
2784
    code = writeRawImpl(taos, metaRsp.metaRsp, metaRsp.metaRspLen, metaRsp.resMsgType);
41,161✔
2785
    tDeleteMqMetaRsp(&metaRsp);
41,161✔
2786
    if (code != TSDB_CODE_SUCCESS) {
41,161✔
2787
      goto end;
×
2788
    }
2789
  }
2790

2791
end:
4,389✔
2792
  tDeleteMqBatchMetaRsp(&rsp);
4,389✔
2793
  RAW_LOG_END
4,389✔
2794
  return code;
4,389✔
2795
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc