• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4995

18 Mar 2026 06:26AM UTC coverage: 71.996% (-0.2%) from 72.244%
#4995

push

travis-ci

web-flow
merge: from main to 3.0 branch #34835

2 of 4 new or added lines in 2 files covered. (50.0%)

5312 existing lines in 167 files now uncovered.

244665 of 339830 relevant lines covered (72.0%)

135013613.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.19
/source/client/src/clientRawBlockWrite.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <string.h>
17
#include "cJSON.h"
18
#include "clientInt.h"
19
#include "osMemPool.h"
20
#include "parser.h"
21
#include "taosdef.h"
22
#include "tarray.h"
23
#include "tbase64.h"
24
#include "tcol.h"
25
#include "tcompression.h"
26
#include "tdatablock.h"
27
#include "tdataformat.h"
28
#include "tdef.h"
29
#include "tglobal.h"
30
#include "tmsgtype.h"
31

32
#define RAW_LOG_END                                                           \
33
  if (code != 0) {                                                            \
34
    uError("%s failed at line:%d since:%s", __func__, lino, tstrerror(code)); \
35
  } else {                                                                    \
36
    uDebug("%s return success", __func__);                                    \
37
  }
38

39
#define RAW_LOG_START uDebug("%s start", __func__);
40

41
#define RAW_NULL_CHECK(c) \
42
  do {                    \
43
    if (c == NULL) {      \
44
      lino = __LINE__;    \
45
      code = terrno;      \
46
      goto end;           \
47
    }                     \
48
  } while (0)
49

50
#define RAW_FALSE_CHECK(c)           \
51
  do {                               \
52
    if (!(c)) {                      \
53
      code = TSDB_CODE_INVALID_PARA; \
54
      lino = __LINE__;               \
55
      goto end;                      \
56
    }                                \
57
  } while (0)
58

59
#define RAW_RETURN_CHECK(c) \
60
  do {                      \
61
    code = c;               \
62
    if (code != 0) {        \
63
      lino = __LINE__;      \
64
      goto end;             \
65
    }                       \
66
  } while (0)
67

68
#define LOG_ID_TAG   "connId:0x%" PRIx64 ", QID:0x%" PRIx64
69
#define LOG_ID_VALUE *(int64_t*)taos, pRequest->requestId
70

71
#define TMQ_META_VERSION "1.0"
72

73
static cJSON* tmqAddObjectToArray(cJSON* array) {
105,176✔
74
  cJSON* item = cJSON_CreateObject();
105,176✔
75
  if (cJSON_AddItemToArray(array, item)) {
105,176✔
76
    return item;
105,176✔
77
  }
78
  cJSON_Delete(item);
×
UNCOV
79
  return NULL;
×
80
}
81

UNCOV
82
static cJSON* tmqAddStringToArray(cJSON* array, const char* str) {
×
UNCOV
83
  cJSON* item = cJSON_CreateString(str);
×
UNCOV
84
  if (cJSON_AddItemToArray(array, item)) {
×
UNCOV
85
    return item;
×
86
  }
87
  cJSON_Delete(item);
×
UNCOV
88
  return NULL;
×
89
}
90
 
91
#define ADD_TO_JSON_STRING(JSON,NAME,VALUE) \
92
  RAW_NULL_CHECK(cJSON_AddStringToObject(JSON, NAME, VALUE));
93

94
#define ADD_TO_JSON_BOOL(JSON,NAME,VALUE) \
95
  RAW_NULL_CHECK(cJSON_AddBoolToObject(JSON, NAME, VALUE));
96

97
#define ADD_TO_JSON_NUMBER(JSON,NAME,VALUE) \
98
  RAW_NULL_CHECK(cJSON_AddNumberToObject(JSON, NAME, VALUE));
99

100
static int32_t  tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen);
101
static tb_uid_t processSuid(tb_uid_t suid, char* db) {
3,223✔
102
  if (db == NULL) {
3,223✔
UNCOV
103
    return suid;
×
104
  }
105
  return suid + MurmurHash3_32(db, strlen(db));
3,223✔
106
}
107

108
static int32_t getLength(int8_t type, int32_t bytes, int32_t typeMod) {
91,100✔
109
  int32_t length = 0;
91,100✔
110
  if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_VARBINARY || type == TSDB_DATA_TYPE_GEOMETRY) {
91,100✔
111
    length = bytes - VARSTR_HEADER_SIZE;
11,952✔
112
  } else if (type == TSDB_DATA_TYPE_NCHAR) {
79,148✔
113
    length = (bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
7,067✔
114
  } else if (IS_STR_DATA_BLOB(type)) {
72,081✔
UNCOV
115
    length = bytes - BLOBSTR_HEADER_SIZE;
×
116
  } else if (type == TSDB_DATA_TYPE_DECIMAL || type == TSDB_DATA_TYPE_DECIMAL64) {
72,081✔
117
    length = typeMod;
360✔
118
  }
119
  return length;
91,100✔
120
}
121

122
static int32_t buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, SExtSchema* pExtSchemas, char* name, int64_t id, int8_t t,
8,552✔
123
                                 bool isVirtual, SColRefWrapper* colRef, SColCmprWrapper* pColCmprRow, cJSON** pJson) {
124
  if (schemaRow == NULL || name == NULL || pColCmprRow == NULL || pJson == NULL) {
8,552✔
125
    uError("invalid parameter, schemaRow:%p, name:%p, pColCmprRow:%p, pJson:%p", schemaRow, name, pColCmprRow, pJson);
×
UNCOV
126
    return TSDB_CODE_INVALID_PARA;
×
127
  }
128
  int32_t code = TSDB_CODE_SUCCESS;
8,552✔
129
  int32_t lino = 0;
8,552✔
130
  int8_t  buildDefaultCompress = 0;
8,552✔
131
  if (pColCmprRow->nCols <= 0) {
8,552✔
132
    buildDefaultCompress = 1;
2,164✔
133
  }
134
  RAW_LOG_START
8,552✔
135
  char*  string = NULL;
8,552✔
136
  cJSON* json = cJSON_CreateObject();
8,552✔
137
  RAW_NULL_CHECK(json);
8,552✔
138

139
  ADD_TO_JSON_STRING(json, "type", "create");
8,552✔
140
  ADD_TO_JSON_STRING(json, "tableType", (t == TSDB_SUPER_TABLE ? "super" : "normal"));
8,552✔
141
  if (isVirtual){
8,552✔
142
    ADD_TO_JSON_BOOL(json, "isVirtual", isVirtual);
3,965✔
143
  }
144
  ADD_TO_JSON_STRING(json, "tableName", name);
8,552✔
145

146
  cJSON* columns = cJSON_AddArrayToObject(json, "columns");
8,552✔
147
  RAW_NULL_CHECK(columns);
8,552✔
148

149
  for (int i = 0; i < schemaRow->nCols; i++) {
92,102✔
150
    cJSON* column = tmqAddObjectToArray(columns);
83,550✔
151
    RAW_NULL_CHECK(column);
83,550✔
152
    SSchema* s = schemaRow->pSchema + i;
83,550✔
153
    ADD_TO_JSON_STRING(column, "name", s->name);
83,550✔
154
    ADD_TO_JSON_NUMBER(column, "type", s->type);
83,550✔
155
    int32_t typeMod = 0;
83,550✔
156
    if (pExtSchemas != NULL) {
83,550✔
157
      typeMod = pExtSchemas[i].typeMod;
56,720✔
158
    }
159
    int32_t length = getLength(s->type, s->bytes, typeMod);
83,550✔
160
    if (length > 0) {
83,550✔
161
      ADD_TO_JSON_NUMBER(column, "length", length);
12,166✔
162
    }
163

164
    if (isVirtual && colRef != NULL && i < colRef->nCols){
83,550✔
165
      SColRef* pColRef = colRef->pColRef + i;
7,572✔
166
      if (pColRef->hasRef) {
7,572✔
167
        cJSON* ref = cJSON_AddObjectToObject(column, "ref");
4,688✔
168
        RAW_NULL_CHECK(ref);
4,688✔
169
        ADD_TO_JSON_STRING(ref, "refDbName", pColRef->refDbName);
4,688✔
170
        ADD_TO_JSON_STRING(ref, "refTableName", pColRef->refTableName);
4,688✔
171
        ADD_TO_JSON_STRING(ref, "refColName", pColRef->refColName);
4,688✔
172
      }
173
    }
174
    ADD_TO_JSON_BOOL(column, "isPrimarykey", (s->flags & COL_IS_KEY));
83,550✔
175
    if (pColCmprRow == NULL) {
83,550✔
UNCOV
176
      continue;
×
177
    }
178

179
    uint32_t alg = 0;
83,550✔
180
    if (buildDefaultCompress) {
83,550✔
181
      alg = createDefaultColCmprByType(s->type);
7,572✔
182
    } else {
183
      SColCmpr* pColCmpr = pColCmprRow->pColCmpr + i;
75,978✔
184
      alg = pColCmpr->alg;
75,978✔
185
    }
186
    const char* encode = columnEncodeStr(COMPRESS_L1_TYPE_U32(alg));
83,550✔
187
    RAW_NULL_CHECK(encode);
83,550✔
188
    const char* compress = columnCompressStr(COMPRESS_L2_TYPE_U32(alg));
83,550✔
189
    RAW_NULL_CHECK(compress);
83,550✔
190
    const char* level = columnLevelStr(COMPRESS_L2_TYPE_LEVEL_U32(alg));
83,550✔
191
    RAW_NULL_CHECK(level);
83,550✔
192

193
    ADD_TO_JSON_STRING(column, "encode", encode);
83,550✔
194
    ADD_TO_JSON_STRING(column, "compress", compress);
83,550✔
195
    ADD_TO_JSON_STRING(column, "level", level);
83,550✔
196
  }
197

198
  cJSON* tags = cJSON_AddArrayToObject(json, "tags");
8,552✔
199
  RAW_NULL_CHECK(tags);
8,552✔
200

201
  for (int i = 0; schemaTag && i < schemaTag->nCols; i++) {
13,494✔
202
    cJSON* tag = tmqAddObjectToArray(tags);
4,942✔
203
    RAW_NULL_CHECK(tag);
4,942✔
204
    SSchema* s = schemaTag->pSchema + i;
4,942✔
205
    ADD_TO_JSON_STRING(tag, "name", s->name);
4,942✔
206
    ADD_TO_JSON_NUMBER(tag, "type", s->type);
4,942✔
207
    int32_t length = getLength(s->type, s->bytes, 0);
4,942✔
208
    if (length > 0) {
4,942✔
209
      ADD_TO_JSON_NUMBER(tag, "length", length);
4,605✔
210
    }
211
  }
212

213
end:
8,552✔
214
  *pJson = json;
8,552✔
215
  RAW_LOG_END
8,552✔
216
  return code;
8,552✔
217
}
218

219
static int32_t setCompressOption(cJSON* json, uint32_t para) {
×
220
  if (json == NULL) {
×
UNCOV
221
    return TSDB_CODE_INVALID_PARA;
×
222
  }
223
  uint8_t encode = COMPRESS_L1_TYPE_U32(para);
×
224
  int32_t code = 0;
×
225
  int32_t lino = 0;
×
226
  RAW_LOG_START
×
227
  if (encode != 0) {
×
228
    const char* encodeStr = columnEncodeStr(encode);
×
229
    RAW_NULL_CHECK(encodeStr);
×
230
    ADD_TO_JSON_STRING(json, "encode", encodeStr);
×
UNCOV
231
    goto end;
×
232
  }
233
  uint8_t compress = COMPRESS_L2_TYPE_U32(para);
×
234
  if (compress != 0) {
×
235
    const char* compressStr = columnCompressStr(compress);
×
236
    RAW_NULL_CHECK(compressStr);
×
237
    ADD_TO_JSON_STRING(json, "compress", compressStr);
×
UNCOV
238
    goto end;
×
239
  }
240
  uint8_t level = COMPRESS_L2_TYPE_LEVEL_U32(para);
×
241
  if (level != 0) {
×
242
    const char* levelStr = columnLevelStr(level);
×
243
    RAW_NULL_CHECK(levelStr);
×
244
    ADD_TO_JSON_STRING(json, "level", levelStr);
×
UNCOV
245
    goto end;
×
246
  }
247

248
end:
×
249
  RAW_LOG_END
×
UNCOV
250
  return code;
×
251
}
252
static int32_t buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON** pJson) {
1,884✔
253
  if (alterData == NULL || pJson == NULL) {
1,884✔
254
    uError("invalid parameter in %s alterData:%p", __func__, alterData);
×
UNCOV
255
    return TSDB_CODE_INVALID_PARA;
×
256
  }
257
  SMAlterStbReq req = {0};
1,884✔
258
  cJSON*        json = NULL;
1,884✔
259
  char*         string = NULL;
1,884✔
260
  int32_t       code = 0;
1,884✔
261
  int32_t       lino = 0;
1,884✔
262
  RAW_LOG_START
1,884✔
263
  RAW_RETURN_CHECK(tDeserializeSMAlterStbReq(alterData, alterDataLen, &req));
1,884✔
264
  json = cJSON_CreateObject();
1,884✔
265
  RAW_NULL_CHECK(json);
1,884✔
266
  ADD_TO_JSON_STRING(json, "type", "alter");
1,884✔
267
  SName name = {0};
1,884✔
268
  RAW_RETURN_CHECK(tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
1,884✔
269
  ADD_TO_JSON_STRING(json, "tableType", "super");
1,884✔
270
  ADD_TO_JSON_STRING(json, "tableName", name.tname);
1,884✔
271
  ADD_TO_JSON_NUMBER(json, "alterType", req.alterType);
1,884✔
272

273
  switch (req.alterType) {
1,884✔
274
    case TSDB_ALTER_TABLE_ADD_TAG:
942✔
275
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
276
      if (taosArrayGetSize(req.pFields) != 1) {
942✔
277
        uError("invalid field num %" PRIzu " for alter type %d", taosArrayGetSize(req.pFields), req.alterType);
×
278
        cJSON_Delete(json);
×
279
        json = NULL;
×
280
        code = TSDB_CODE_INVALID_PARA;
×
UNCOV
281
        goto end;
×
282
      }
283
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
942✔
284
      RAW_NULL_CHECK(field);
942✔
285
      ADD_TO_JSON_STRING(json, "colName", field->name);
942✔
286
      ADD_TO_JSON_NUMBER(json, "colType", field->type);
942✔
287
      int32_t typeMode = 0;
942✔
288
      if (taosArrayGetSize(req.pTypeMods) > 0) {
942✔
UNCOV
289
        typeMode = *(STypeMod*)taosArrayGet(req.pTypeMods, 0);
×
290
      }
291
      int32_t length = getLength(field->type, field->bytes, typeMode);
942✔
292
      if (length > 0) {
942✔
293
        ADD_TO_JSON_NUMBER(json, "colLength", length);
942✔
294
      }
295

296
      break;
942✔
297
    }
298
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: {
×
299
      SFieldWithOptions* field = taosArrayGet(req.pFields, 0);
×
300
      RAW_NULL_CHECK(field);
×
301
      ADD_TO_JSON_STRING(json, "colName", field->name);
×
302
      ADD_TO_JSON_NUMBER(json, "colType", field->type);
×
303
      int32_t typeMode = 0;
×
304
      if (taosArrayGetSize(req.pTypeMods) > 0) {
×
UNCOV
305
        typeMode = *(STypeMod*)taosArrayGet(req.pTypeMods, 0);
×
306
      }
307
      int32_t length = getLength(field->type, field->bytes, typeMode);
×
308
      if (length > 0) {
×
UNCOV
309
        ADD_TO_JSON_NUMBER(json, "colLength", length);
×
310
      }
311

312
      RAW_RETURN_CHECK(setCompressOption(json, field->compress));
×
UNCOV
313
      break;
×
314
    }
UNCOV
315
    case TSDB_ALTER_TABLE_DROP_TAG:
×
316
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
UNCOV
317
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
×
UNCOV
318
      RAW_NULL_CHECK(field);
×
UNCOV
319
      ADD_TO_JSON_STRING(json, "colName", field->name);
×
UNCOV
320
      break;
×
321
    }
322
    case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
942✔
323
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
324
      if (taosArrayGetSize(req.pFields) != 1) {
942✔
325
        uError("invalid field num %" PRIzu " for alter type %d", taosArrayGetSize(req.pFields), req.alterType);
×
326
        cJSON_Delete(json);
×
327
        json = NULL;
×
328
        code = TSDB_CODE_INVALID_PARA;
×
UNCOV
329
        goto end;
×
330
      }
331
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
942✔
332
      RAW_NULL_CHECK(field);
942✔
333
      ADD_TO_JSON_STRING(json, "colName", field->name);
942✔
334
      ADD_TO_JSON_NUMBER(json, "colType", field->type);
942✔
335
      int32_t length = getLength(field->type, field->bytes, 0);
942✔
336
      if (length > 0) {
942✔
337
        ADD_TO_JSON_NUMBER(json, "colLength", length);
942✔
338
      }
339

340
      break;
942✔
341
    }
UNCOV
342
    case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
×
343
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
344
      TAOS_FIELD* oldField = taosArrayGet(req.pFields, 0);
×
345
      RAW_NULL_CHECK(oldField);
×
346
      TAOS_FIELD* newField = taosArrayGet(req.pFields, 1);
×
347
      RAW_NULL_CHECK(newField);
×
348
      ADD_TO_JSON_STRING(json, "colName", oldField->name);
×
349
      ADD_TO_JSON_STRING(json, "colNewName", newField->name);
×
UNCOV
350
      break;
×
351
    }
352
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
×
353
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
×
354
      RAW_NULL_CHECK(field);
×
355
      ADD_TO_JSON_STRING(json, "colName", field->name);
×
356
      RAW_RETURN_CHECK(setCompressOption(json, field->bytes));
×
UNCOV
357
      break;
×
358
    }
359
    default:
×
UNCOV
360
      break;
×
361
  }
362

363
end:
1,884✔
364
  tFreeSMAltertbReq(&req);
1,884✔
365
  *pJson = json;
1,884✔
366
  RAW_LOG_END
1,884✔
367
  return code;
1,884✔
368
}
369

370
static int32_t processCreateStb(SMqMetaRsp* metaRsp, cJSON** pJson) {
4,588✔
371
  if (metaRsp == NULL || pJson == NULL) {
4,588✔
372
    uError("invalid parameter in %s", __func__);
×
UNCOV
373
    return TSDB_CODE_INVALID_PARA;
×
374
  }
375
  int32_t        code = TSDB_CODE_SUCCESS;
4,588✔
376
  int32_t        lino = 0;
4,588✔
377
  SVCreateStbReq req = {0};
4,588✔
378
  SDecoder       coder = {0};
4,588✔
379

380
  RAW_LOG_START
4,588✔
381
  // decode and process req
382
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
4,588✔
383
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
4,588✔
384
  tDecoderInit(&coder, data, len);
4,588✔
385

386
  RAW_RETURN_CHECK(tDecodeSVCreateStbReq(&coder, &req));
4,588✔
387
  RAW_RETURN_CHECK(buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.pExtSchemas, req.name, req.suid,
4,588✔
388
                                        TSDB_SUPER_TABLE, req.virtualStb, NULL, &req.colCmpr, pJson));
389

390
end:
4,588✔
391
  tDecoderClear(&coder);
4,588✔
392
  RAW_LOG_END
4,588✔
393
  return code;
4,588✔
394
}
395

396
static int32_t processAlterStb(SMqMetaRsp* metaRsp, cJSON** pJson) {
1,884✔
397
  if (metaRsp == NULL || pJson == NULL) {
1,884✔
398
    uError("invalid parameter in %s", __func__);
×
UNCOV
399
    return TSDB_CODE_INVALID_PARA;
×
400
  }
401
  SVCreateStbReq req = {0};
1,884✔
402
  SDecoder       coder = {0};
1,884✔
403
  int32_t        code = TSDB_CODE_SUCCESS;
1,884✔
404
  int32_t        lino = 0;
1,884✔
405
  RAW_LOG_START
1,884✔
406

407
  // decode and process req
408
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
1,884✔
409
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
1,884✔
410
  tDecoderInit(&coder, data, len);
1,884✔
411

412
  RAW_RETURN_CHECK(tDecodeSVCreateStbReq(&coder, &req));
1,884✔
413
  RAW_RETURN_CHECK(buildAlterSTableJson(req.alterOriData, req.alterOriDataLen, pJson));
1,884✔
414

415
end:
1,884✔
416
  tDecoderClear(&coder);
1,884✔
417
  RAW_LOG_END
1,884✔
418
  return code;
1,884✔
419
}
420

421
static int32_t buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
8,088✔
422
  if (json == NULL || pCreateReq == NULL) {
8,088✔
423
    uError("invalid parameter in %s", __func__);
×
UNCOV
424
    return TSDB_CODE_INVALID_PARA;
×
425
  }
426
  STag*   pTag = (STag*)pCreateReq->ctb.pTag;
8,088✔
427
  char*   sname = pCreateReq->ctb.stbName;
8,088✔
428
  char*   name = pCreateReq->name;
8,088✔
429
  SArray* tagName = pCreateReq->ctb.tagName;
8,088✔
430
  int64_t id = pCreateReq->uid;
8,088✔
431
  uint8_t tagNum = pCreateReq->ctb.tagNum;
8,088✔
432
  int32_t code = 0;
8,088✔
433
  int32_t lino = 0;
8,088✔
434
  SArray* pTagVals = NULL;
8,088✔
435
  char*   pJson = NULL;
8,088✔
436
  char*   buf = NULL;
8,088✔
437
  RAW_LOG_START
8,088✔
438

439
  ADD_TO_JSON_STRING(json, "tableName", name);
8,088✔
440

441
  if (pCreateReq->type == TSDB_VIRTUAL_CHILD_TABLE) {
8,088✔
442
    cJSON* refs = cJSON_AddArrayToObject(json, "refs");
3,602✔
443
    RAW_NULL_CHECK(refs);
3,602✔
444

445
    for (int i = 0; i < pCreateReq->colRef.nCols; i++) {
18,010✔
446
      SColRef* pColRef = pCreateReq->colRef.pColRef + i;
14,408✔
447
  
448
      if (!pColRef->hasRef) {
14,408✔
449
        continue;
7,204✔
450
      }
451
      cJSON* ref = tmqAddObjectToArray(refs);
7,204✔
452
      RAW_NULL_CHECK(ref);
7,204✔
453
      ADD_TO_JSON_STRING(ref, "colName", pColRef->colName);
7,204✔
454
      ADD_TO_JSON_STRING(ref, "refDbName", pColRef->refDbName);
7,204✔
455
      ADD_TO_JSON_STRING(ref, "refTableName", pColRef->refTableName);
7,204✔
456
      ADD_TO_JSON_STRING(ref, "refColName", pColRef->refColName);
7,204✔
457
    }
458
  }
459

460
  ADD_TO_JSON_STRING(json, "using", sname);
8,088✔
461
  ADD_TO_JSON_NUMBER(json, "tagNum", tagNum);
8,088✔
462

463
  cJSON* tags = cJSON_AddArrayToObject(json, "tags");
8,088✔
464
  RAW_NULL_CHECK(tags);
8,088✔
465
  RAW_RETURN_CHECK(tTagToValArray(pTag, &pTagVals));
8,088✔
466
  if (tTagIsJson(pTag)) {
8,088✔
UNCOV
467
    STag* p = (STag*)pTag;
×
UNCOV
468
    if (p->nTag == 0) {
×
UNCOV
469
      uWarn("p->nTag == 0");
×
UNCOV
470
      goto end;
×
471
    }
UNCOV
472
    parseTagDatatoJson(pTag, &pJson, NULL);
×
UNCOV
473
    RAW_NULL_CHECK(pJson);
×
UNCOV
474
    cJSON* tag = tmqAddObjectToArray(tags);
×
UNCOV
475
    RAW_NULL_CHECK(tag);
×
UNCOV
476
    STagVal* pTagVal = taosArrayGet(pTagVals, 0);
×
UNCOV
477
    RAW_NULL_CHECK(pTagVal);
×
UNCOV
478
    char* ptname = taosArrayGet(tagName, 0);
×
UNCOV
479
    RAW_NULL_CHECK(ptname);
×
UNCOV
480
    ADD_TO_JSON_STRING(tag, "name", ptname);
×
UNCOV
481
    ADD_TO_JSON_NUMBER(tag, "type", TSDB_DATA_TYPE_JSON);
×
UNCOV
482
    ADD_TO_JSON_STRING(tag, "value", pJson);
×
UNCOV
483
    goto end;
×
484
  }
485

486
  for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
16,844✔
487
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
8,756✔
488
    RAW_NULL_CHECK(pTagVal);
8,756✔
489
    cJSON* tag = tmqAddObjectToArray(tags);
8,756✔
490
    RAW_NULL_CHECK(tag);
8,756✔
491
    char* ptname = taosArrayGet(tagName, i);
8,756✔
492
    RAW_NULL_CHECK(ptname);
8,756✔
493
    ADD_TO_JSON_STRING(tag, "name", ptname);
8,756✔
494
    ADD_TO_JSON_NUMBER(tag, "type", pTagVal->type);
8,756✔
495

496
    if (IS_VAR_DATA_TYPE(pTagVal->type)) {
16,838✔
497
      if (IS_STR_DATA_BLOB(pTagVal->type)) {
8,082✔
UNCOV
498
        goto end;
×
499
      }
500
      int64_t bufSize = 0;
8,082✔
501
      if (pTagVal->type == TSDB_DATA_TYPE_VARBINARY) {
8,082✔
UNCOV
502
        bufSize = pTagVal->nData * 2 + 2 + 3;
×
503
      } else {
504
        bufSize = pTagVal->nData + 3;
8,082✔
505
      }
506
      buf = taosMemoryCalloc(bufSize, 1);
8,082✔
507
      RAW_NULL_CHECK(buf);
8,082✔
508
      code = dataConverToStr(buf, bufSize, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL);
8,082✔
509
      if (code != TSDB_CODE_SUCCESS) {
8,082✔
510
        uError("convert tag value to string failed");
×
UNCOV
511
        goto end;
×
512
      }
513

514
      ADD_TO_JSON_STRING(tag, "value", buf)
8,082✔
515
      taosMemoryFreeClear(buf);
8,082✔
516
    } else {
517
      double val = 0;
674✔
518
      GET_TYPED_DATA(val, double, pTagVal->type, &pTagVal->i64, 0);  // currently tag type can't be decimal, so pass 0 as typeMod
674✔
519
      ADD_TO_JSON_NUMBER(tag, "value", val)
674✔
520
    }
521
  }
522

523
end:
8,088✔
524
  taosMemoryFree(pJson);
8,088✔
525
  taosArrayDestroy(pTagVals);
8,088✔
526
  taosMemoryFree(buf);
8,088✔
527
  RAW_LOG_END
8,088✔
528
  return code;
8,088✔
529
}
530

531
static int32_t buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs, cJSON** pJson) {
7,364✔
532
  if (pJson == NULL || pCreateReq == NULL) {
7,364✔
533
    uError("invalid parameter in %s", __func__);
×
UNCOV
534
    return TSDB_CODE_INVALID_PARA;
×
535
  }
536
  int32_t code = 0;
7,364✔
537
  int32_t lino = 0;
7,364✔
538
  RAW_LOG_START
7,364✔
539
  char*  string = NULL;
7,364✔
540
  cJSON* json = cJSON_CreateObject();
7,364✔
541
  RAW_NULL_CHECK(json);
7,364✔
542
  ADD_TO_JSON_STRING(json, "type", "create");
7,364✔
543
  ADD_TO_JSON_STRING(json, "tableType", "child");
7,364✔
544

545
  RAW_RETURN_CHECK(buildChildElement(json, pCreateReq));
7,364✔
546
  cJSON* createList = cJSON_AddArrayToObject(json, "createList");
7,364✔
547
  RAW_NULL_CHECK(createList);
7,364✔
548

549
  for (int i = 0; nReqs > 1 && i < nReqs; i++) {
8,088✔
550
    cJSON* create = tmqAddObjectToArray(createList);
724✔
551
    RAW_NULL_CHECK(create);
724✔
552
    RAW_RETURN_CHECK(buildChildElement(create, pCreateReq + i));
724✔
553
  }
554

555
end:
7,364✔
556
  *pJson = json;
7,364✔
557
  RAW_LOG_END
7,364✔
558
  return code;
7,364✔
559
}
560

561
static int32_t processCreateTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
10,275✔
562
  if (pJson == NULL || metaRsp == NULL) {
10,275✔
563
    uError("invalid parameter in %s", __func__);
×
UNCOV
564
    return TSDB_CODE_INVALID_PARA;
×
565
  }
566
  int32_t            code = TSDB_CODE_SUCCESS;
10,275✔
567
  int32_t            lino = 0;
10,275✔
568
  SDecoder           decoder = {0};
10,275✔
569
  SVCreateTbBatchReq req = {0};
10,275✔
570
  SVCreateTbReq*     pCreateReq;
571
  RAW_LOG_START
10,275✔
572
  // decode
573
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
10,275✔
574
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
10,275✔
575
  tDecoderInit(&decoder, data, len);
10,275✔
576
  RAW_RETURN_CHECK(tDecodeSVCreateTbBatchReq(&decoder, &req));
10,275✔
577
  // loop to create table
578
  if (req.nReqs > 0) {
10,275✔
579
    pCreateReq = req.pReqs;
10,275✔
580
    if (pCreateReq->type == TSDB_CHILD_TABLE || pCreateReq->type == TSDB_VIRTUAL_CHILD_TABLE) {
10,275✔
581
      RAW_RETURN_CHECK(buildCreateCTableJson(req.pReqs, req.nReqs, pJson));
6,311✔
582
    } else if (pCreateReq->type == TSDB_NORMAL_TABLE || pCreateReq->type == TSDB_VIRTUAL_NORMAL_TABLE) {
3,964✔
583
      RAW_RETURN_CHECK(buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->pExtSchemas, pCreateReq->name,
3,964✔
584
                                            pCreateReq->uid, pCreateReq->type, pCreateReq->type == TSDB_NORMAL_TABLE ? false : true, 
585
                                            &pCreateReq->colRef, &pCreateReq->colCmpr, pJson));
586
    }
587
  }
588

589
end:
10,275✔
590
  tDeleteSVCreateTbBatchReq(&req);
10,275✔
591
  tDecoderClear(&decoder);
10,275✔
592
  RAW_LOG_END
10,275✔
593
  return code;
10,275✔
594
}
595

596
static int32_t processAutoCreateTable(SMqDataRsp* rsp, char** string) {
1,053✔
597
  int32_t lino = 0;
1,053✔
598
  int32_t code = TSDB_CODE_SUCCESS;
1,053✔
599
  RAW_LOG_START
1,053✔
600
  RAW_FALSE_CHECK(rsp != NULL && string != NULL);
1,053✔
601
  SDecoder*      decoder = NULL;
1,053✔
602
  SVCreateTbReq* pCreateReq = NULL;
1,053✔
603
  RAW_FALSE_CHECK(rsp->createTableNum > 0);
1,053✔
604

605
  decoder = taosMemoryCalloc(rsp->createTableNum, sizeof(SDecoder));
1,053✔
606
  RAW_NULL_CHECK(decoder);
1,053✔
607
  pCreateReq = taosMemoryCalloc(rsp->createTableNum, sizeof(SVCreateTbReq));
1,053✔
608
  RAW_NULL_CHECK(pCreateReq);
1,053✔
609

610
  // loop to create table
611
  for (int32_t iReq = 0; iReq < rsp->createTableNum; iReq++) {
2,468✔
612
    // decode
613
    void** data = taosArrayGet(rsp->createTableReq, iReq);
1,415✔
614
    RAW_NULL_CHECK(data);
1,415✔
615
    int32_t* len = taosArrayGet(rsp->createTableLen, iReq);
1,415✔
616
    RAW_NULL_CHECK(len);
1,415✔
617
    tDecoderInit(&decoder[iReq], *data, *len);
1,415✔
618
    RAW_RETURN_CHECK(tDecodeSVCreateTbReq(&decoder[iReq], pCreateReq + iReq));
1,415✔
619

620
    if (pCreateReq[iReq].type != TSDB_CHILD_TABLE && pCreateReq[iReq].type != TSDB_NORMAL_TABLE) {
1,415✔
621
      uError("%s failed. pCreateReq[iReq].type:%d invalid", __func__, pCreateReq[iReq].type);
×
622
      code = TSDB_CODE_INVALID_PARA;
×
UNCOV
623
      goto end;
×
624
    }
625
  }
626
  cJSON* pJson = NULL;
1,053✔
627
  if (pCreateReq->type == TSDB_NORMAL_TABLE) {
1,053✔
UNCOV
628
    RAW_RETURN_CHECK(buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->pExtSchemas, pCreateReq->name,
×
629
                                          pCreateReq->uid, TSDB_NORMAL_TABLE, false, NULL, &pCreateReq->colCmpr, &pJson));
630
  } else if (pCreateReq->type == TSDB_CHILD_TABLE) {
1,053✔
631
    RAW_RETURN_CHECK(buildCreateCTableJson(pCreateReq, rsp->createTableNum, &pJson));
1,053✔
632
  }
633

634
  *string = cJSON_PrintUnformatted(pJson);
1,053✔
635
  cJSON_Delete(pJson);
1,053✔
636

637
  uDebug("auto created table return, sql json:%s", *string);
1,053✔
638

639
end:
1,053✔
640
  RAW_LOG_END
1,053✔
641
  for (int i = 0; decoder && pCreateReq && i < rsp->createTableNum; i++) {
2,468✔
642
    tDecoderClear(&decoder[i]);
1,415✔
643
    taosMemoryFreeClear(pCreateReq[i].comment);
1,415✔
644
    if (pCreateReq[i].type == TSDB_CHILD_TABLE) {
1,415✔
645
      taosArrayDestroy(pCreateReq[i].ctb.tagName);
1,415✔
646
    }
647
  }
648
  taosMemoryFree(decoder);
1,053✔
649
  taosMemoryFree(pCreateReq);
1,053✔
650
  return code;
1,053✔
651
}
652

653
static int32_t processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
2,896✔
654
  if (pJson == NULL || metaRsp == NULL) {
2,896✔
655
    uError("invalid parameter in %s", __func__);
×
UNCOV
656
    return TSDB_CODE_INVALID_PARA;
×
657
  }
658
  SDecoder     decoder = {0};
2,896✔
659
  SVAlterTbReq vAlterTbReq = {0};
2,896✔
660
  char*        string = NULL;
2,896✔
661
  cJSON*       json = NULL;
2,896✔
662
  int32_t      code = 0;
2,896✔
663
  int32_t      lino = 0;
2,896✔
664
  char*        buf = NULL;
2,896✔
665
  char*        buf1 = NULL;
2,896✔
666

667
  RAW_LOG_START
2,896✔
668

669
  // decode
670
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
2,896✔
671
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
2,896✔
672
  tDecoderInit(&decoder, data, len);
2,896✔
673
  RAW_RETURN_CHECK(tDecodeSVAlterTbReq(&decoder, &vAlterTbReq));
2,896✔
674

675
  json = cJSON_CreateObject();
2,896✔
676
  RAW_NULL_CHECK(json);
2,896✔
677
  ADD_TO_JSON_STRING(json, "type", "alter");
2,896✔
678

679
  char* tableType = NULL;
2,896✔
680
  if (vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ||
2,896✔
681
      vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL){
2,896✔
UNCOV
682
    tableType = "child";
×
683
  } else if (vAlterTbReq.action == TSDB_ALTER_TABLE_ALTER_COLUMN_REF ||
2,896✔
684
             vAlterTbReq.action == TSDB_ALTER_TABLE_REMOVE_COLUMN_REF) {
1,810✔
685
    tableType = "";
2,172✔
686
  } else {
687
    tableType = "normal";
724✔
688
  }
689

690
  ADD_TO_JSON_STRING(json, "tableType", tableType);
2,896✔
691
  ADD_TO_JSON_STRING(json, "tableName", vAlterTbReq.tbName);
2,896✔
692
  ADD_TO_JSON_NUMBER(json, "alterType", vAlterTbReq.action);
2,896✔
693

694
  uDebug("alter table action:%d", vAlterTbReq.action);
2,896✔
695
  switch (vAlterTbReq.action) {
2,896✔
696
    case TSDB_ALTER_TABLE_ADD_COLUMN: 
724✔
697
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COLUMN_REF: {
698
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
724✔
699
      ADD_TO_JSON_NUMBER(json, "colType", vAlterTbReq.type);
724✔
700

701
      int32_t length = getLength(vAlterTbReq.type, vAlterTbReq.bytes, vAlterTbReq.typeMod);
724✔
702
      if (length > 0) {
724✔
703
        ADD_TO_JSON_NUMBER(json, "colLength", length);
724✔
704
      }
705

706
      if (vAlterTbReq.action == TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COLUMN_REF) {
724✔
707
        ADD_TO_JSON_STRING(json, "refDbName", vAlterTbReq.refDbName);
724✔
708
        ADD_TO_JSON_STRING(json, "refTbName", vAlterTbReq.refTbName);
724✔
709
        ADD_TO_JSON_STRING(json, "refColName", vAlterTbReq.refColName);
724✔
710
      }
711
      break;
724✔
712
    }
713
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: {
×
714
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
×
UNCOV
715
      ADD_TO_JSON_NUMBER(json, "colType", vAlterTbReq.type);
×
716

717
      int32_t length = getLength(vAlterTbReq.type, vAlterTbReq.bytes, vAlterTbReq.typeMod);
×
718
      if (length > 0) {
×
UNCOV
719
        ADD_TO_JSON_NUMBER(json, "colLength", length);
×
720
      }
721
      RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
×
UNCOV
722
      break;
×
723
    }
UNCOV
724
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
×
UNCOV
725
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
×
UNCOV
726
      break;
×
727
    }
UNCOV
728
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
×
UNCOV
729
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
×
UNCOV
730
      ADD_TO_JSON_NUMBER(json, "colType", vAlterTbReq.colModType);
×
UNCOV
731
      int32_t length = getLength(vAlterTbReq.colModType, vAlterTbReq.colModBytes, vAlterTbReq.typeMod);
×
UNCOV
732
      if (length > 0) {
×
UNCOV
733
        ADD_TO_JSON_NUMBER(json, "colLength", length);
×
734
      }
UNCOV
735
      break;
×
736
    }
UNCOV
737
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
×
UNCOV
738
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
×
UNCOV
739
      ADD_TO_JSON_STRING(json, "colNewName", vAlterTbReq.colNewName);
×
UNCOV
740
      break;
×
741
    }
UNCOV
742
    case TSDB_ALTER_TABLE_UPDATE_MULTI_TABLE_TAG_VAL: {
×
UNCOV
743
      int32_t nTables = taosArrayGetSize(vAlterTbReq.tables);
×
UNCOV
744
      if (nTables <= 0) {
×
UNCOV
745
        code = TSDB_CODE_INVALID_PARA;
×
746
        uError("processAlterTable parse multi tables error");
×
747
        goto end;
×
748
      }
749

UNCOV
750
      cJSON* tables = cJSON_AddArrayToObject(json, "tables");
×
751
      RAW_NULL_CHECK(tables);
×
752

753
      for (int32_t i = 0; i < nTables; i++) {
×
754
        SUpdateTableTagVal* pTable = taosArrayGet(vAlterTbReq.tables, i);
×
UNCOV
755
        cJSON* tableObj = tmqAddObjectToArray(tables);
×
756
        RAW_NULL_CHECK(tableObj);
×
757

758
        ADD_TO_JSON_STRING(tableObj, "tableName", pTable->tbName);
×
759

760
        int32_t nTags = taosArrayGetSize(pTable->tags);
×
UNCOV
761
        cJSON* tags = cJSON_AddArrayToObject(tableObj, "tags");
×
UNCOV
762
        RAW_NULL_CHECK(tags);
×
763

UNCOV
764
        for (int32_t j = 0; j < nTags; j++) {
×
765
          cJSON* member = tmqAddObjectToArray(tags);
×
UNCOV
766
          RAW_NULL_CHECK(member);
×
767

UNCOV
768
          SUpdatedTagVal* pTagVal = taosArrayGet(pTable->tags, j);
×
UNCOV
769
          ADD_TO_JSON_STRING(member, "colName", pTagVal->tagName);
×
770

UNCOV
771
          if (pTagVal->regexp != NULL) {
×
UNCOV
772
            ADD_TO_JSON_STRING(member, "regexp", pTagVal->regexp);
×
773
            ADD_TO_JSON_STRING(member, "replacement", pTagVal->replacement);
×
774
          } else {
UNCOV
775
            bool isNull = pTagVal->isNull;
×
UNCOV
776
            if (!isNull) {
×
UNCOV
777
              int64_t bufSize = 0;
×
UNCOV
778
              if (pTagVal->tagType == TSDB_DATA_TYPE_VARBINARY) {
×
UNCOV
779
                bufSize = pTagVal->nTagVal * 2 + 2 + 3;
×
780
              } else {
UNCOV
781
                bufSize = pTagVal->nTagVal + 3;
×
782
              }
UNCOV
783
              buf1 = taosMemoryCalloc(bufSize, 1);
×
784
              RAW_NULL_CHECK(buf1);
×
785
              code = dataConverToStr(buf1, bufSize, pTagVal->tagType, pTagVal->pTagVal, pTagVal->nTagVal, NULL);
×
786
              if (code != TSDB_CODE_SUCCESS) {
×
787
                uError("convert tag value to string failed");
×
788
                goto end;
×
789
              }
UNCOV
790
              ADD_TO_JSON_STRING(member, "colValue", buf1);
×
UNCOV
791
              taosMemoryFreeClear(buf1);
×
792
            }
793
            ADD_TO_JSON_BOOL(member, "colValueNull", isNull);
×
794
          }
795
        }
796
      }
797
      break;
×
798
    }
799

800
    case TSDB_ALTER_TABLE_UPDATE_CHILD_TABLE_TAG_VAL: {
×
UNCOV
801
      int32_t nTags = taosArrayGetSize(vAlterTbReq.pMultiTag);
×
802
      if (nTags <= 0) {
×
803
        code = TSDB_CODE_INVALID_PARA;
×
804
        uError("processAlterTable parse multi tags error");
×
805
        goto end;
×
806
      }
807

808
      cJSON* tags = cJSON_AddArrayToObject(json, "tags");
×
809
      RAW_NULL_CHECK(tags);
×
810

811
      for (int32_t i = 0; i < nTags; i++) {
×
UNCOV
812
        cJSON* member = tmqAddObjectToArray(tags);
×
813
        RAW_NULL_CHECK(member);
×
814

815
        SUpdatedTagVal* pTagVal = taosArrayGet(vAlterTbReq.pMultiTag, i);
×
816
        ADD_TO_JSON_STRING(member, "colName", pTagVal->tagName);
×
817

818
        if (pTagVal->regexp != NULL) {
×
819
          ADD_TO_JSON_STRING(member, "regexp", pTagVal->regexp);
×
820
          ADD_TO_JSON_STRING(member, "replacement", pTagVal->replacement);
×
821
        } else {
822
          bool isNull = pTagVal->isNull;
×
823
          if (!isNull) {
×
UNCOV
824
            int64_t bufSize = 0;
×
825
            if (pTagVal->tagType == TSDB_DATA_TYPE_VARBINARY) {
×
UNCOV
826
              bufSize = pTagVal->nTagVal * 2 + 2 + 3;
×
827
            } else {
UNCOV
828
              bufSize = pTagVal->nTagVal + 3;
×
829
            }
830
            buf1 = taosMemoryCalloc(bufSize, 1);
×
831
            RAW_NULL_CHECK(buf1);
×
832
            code = dataConverToStr(buf1, bufSize, pTagVal->tagType, pTagVal->pTagVal, pTagVal->nTagVal, NULL);
×
833
            if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
834
              uError("convert tag value to string failed");
×
UNCOV
835
              goto end;
×
836
            }
UNCOV
837
            ADD_TO_JSON_STRING(member, "colValue", buf1);
×
UNCOV
838
            taosMemoryFreeClear(buf1);
×
839
          }
UNCOV
840
          ADD_TO_JSON_BOOL(member, "colValueNull", isNull);
×
841
        }
842
      }
843

UNCOV
844
      if (vAlterTbReq.whereLen > 0) {
×
UNCOV
845
        ADD_TO_JSON_NUMBER(json, "whereLen", vAlterTbReq.whereLen);
×
UNCOV
846
        char* whereBuf = NULL;
×
UNCOV
847
        code = base64_encode(vAlterTbReq.where, vAlterTbReq.whereLen, &whereBuf);
×
UNCOV
848
        if (code != 0 || whereBuf == NULL) {
×
UNCOV
849
          uError("base64 encode where condition failed");
×
UNCOV
850
          code = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
851
          goto end;
×
852
        }
UNCOV
853
        ADD_TO_JSON_STRING(json, "where", whereBuf);
×
UNCOV
854
        taosMemoryFree(whereBuf);
×
855
      }
UNCOV
856
      break;
×
857
    }
858

UNCOV
859
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
×
UNCOV
860
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
×
UNCOV
861
      RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
×
UNCOV
862
      break;
×
863
    }
864
    case TSDB_ALTER_TABLE_ALTER_COLUMN_REF: {
1,086✔
865
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
1,086✔
866
      ADD_TO_JSON_STRING(json, "refDbName", vAlterTbReq.refDbName);
1,086✔
867
      ADD_TO_JSON_STRING(json, "refTbName", vAlterTbReq.refTbName);
1,086✔
868
      ADD_TO_JSON_STRING(json, "refColName", vAlterTbReq.refColName);
1,086✔
869
      break;
1,086✔
870
    }
871
    case TSDB_ALTER_TABLE_REMOVE_COLUMN_REF:{
1,086✔
872
      ADD_TO_JSON_STRING(json, "colName", vAlterTbReq.colName);
1,086✔
873
      break;
1,086✔
874
    }
UNCOV
875
    default:
×
UNCOV
876
      break;
×
877
  }
878

879
end:
2,896✔
880
  if (vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL) {
2,896✔
UNCOV
881
    taosArrayDestroy(vAlterTbReq.pMultiTag);
×
882
  }
883
  if (vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_CHILD_TABLE_TAG_VAL) {
2,896✔
UNCOV
884
    taosArrayDestroy(vAlterTbReq.pMultiTag);
×
885
  }
886
  if (vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TABLE_TAG_VAL) {
2,896✔
UNCOV
887
    int32_t nTables = taosArrayGetSize(vAlterTbReq.tables);
×
UNCOV
888
    for (int32_t i = 0; i < nTables; i++) {
×
UNCOV
889
      SUpdateTableTagVal* pTable = taosArrayGet(vAlterTbReq.tables, i);
×
UNCOV
890
      taosArrayDestroy(pTable->tags);
×
891
    }
UNCOV
892
    taosArrayDestroy(vAlterTbReq.tables);
×
893
  }
894
  tDecoderClear(&decoder);
2,896✔
895
  taosMemoryFree(buf);
2,896✔
896
  taosMemoryFree(buf1);
2,896✔
897
  *pJson = json;
2,896✔
898
  RAW_LOG_END
2,896✔
899
  return code;
2,896✔
900
}
901

UNCOV
902
static int32_t processDropSTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
×
UNCOV
903
  if (pJson == NULL || metaRsp == NULL) {
×
UNCOV
904
    uError("invalid parameter in %s", __func__);
×
UNCOV
905
    return TSDB_CODE_INVALID_PARA;
×
906
  }
UNCOV
907
  SDecoder     decoder = {0};
×
UNCOV
908
  SVDropStbReq req = {0};
×
UNCOV
909
  cJSON*       json = NULL;
×
UNCOV
910
  int32_t      code = 0;
×
UNCOV
911
  int32_t      lino = 0;
×
UNCOV
912
  RAW_LOG_START
×
913

914
  // decode
UNCOV
915
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
×
UNCOV
916
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
×
UNCOV
917
  tDecoderInit(&decoder, data, len);
×
UNCOV
918
  RAW_RETURN_CHECK(tDecodeSVDropStbReq(&decoder, &req));
×
919

UNCOV
920
  json = cJSON_CreateObject();
×
UNCOV
921
  RAW_NULL_CHECK(json);
×
UNCOV
922
  ADD_TO_JSON_STRING(json, "type", "drop");
×
UNCOV
923
  ADD_TO_JSON_STRING(json, "tableType", "super");
×
UNCOV
924
  ADD_TO_JSON_STRING(json, "tableName", req.name);
×
925

UNCOV
926
end:
×
UNCOV
927
  tDecoderClear(&decoder);
×
UNCOV
928
  *pJson = json;
×
929
  RAW_LOG_END
×
930
  return code;
×
931
}
UNCOV
932
static int32_t processDeleteTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
×
UNCOV
933
  if (pJson == NULL || metaRsp == NULL) {
×
UNCOV
934
    uError("invalid parameter in %s", __func__);
×
UNCOV
935
    return TSDB_CODE_INVALID_PARA;
×
936
  }
UNCOV
937
  SDeleteRes req = {0};
×
UNCOV
938
  SDecoder   coder = {0};
×
UNCOV
939
  cJSON*     json = NULL;
×
UNCOV
940
  int32_t    code = 0;
×
UNCOV
941
  int32_t    lino = 0;
×
UNCOV
942
  RAW_LOG_START
×
943

944
  // decode and process req
UNCOV
945
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
×
UNCOV
946
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
×
947

UNCOV
948
  tDecoderInit(&coder, data, len);
×
UNCOV
949
  RAW_RETURN_CHECK(tDecodeDeleteRes(&coder, &req));
×
950
  //  getTbName(req.tableFName);
UNCOV
951
  char sql[256] = {0};
×
UNCOV
952
  (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
×
953
                 req.tsColName, req.skey, req.tsColName, req.ekey);
954

UNCOV
955
  json = cJSON_CreateObject();
×
UNCOV
956
  RAW_NULL_CHECK(json);
×
UNCOV
957
  ADD_TO_JSON_STRING(json, "type", "delete");
×
UNCOV
958
  ADD_TO_JSON_STRING(json, "sql", sql);
×
959

UNCOV
960
end:
×
UNCOV
961
  tDecoderClear(&coder);
×
UNCOV
962
  *pJson = json;
×
UNCOV
963
  RAW_LOG_END
×
UNCOV
964
  return code;
×
965
}
966

UNCOV
967
static int32_t processDropTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
×
UNCOV
968
  if (pJson == NULL || metaRsp == NULL) {
×
UNCOV
969
    uError("invalid parameter in %s", __func__);
×
UNCOV
970
    return TSDB_CODE_INVALID_PARA;
×
971
  }
UNCOV
972
  SDecoder         decoder = {0};
×
UNCOV
973
  SVDropTbBatchReq req = {0};
×
UNCOV
974
  cJSON*           json = NULL;
×
UNCOV
975
  int32_t          code = 0;
×
UNCOV
976
  int32_t          lino = 0;
×
UNCOV
977
  RAW_LOG_START
×
978
  // decode
UNCOV
979
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
×
UNCOV
980
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
×
981
  tDecoderInit(&decoder, data, len);
×
982
  RAW_RETURN_CHECK(tDecodeSVDropTbBatchReq(&decoder, &req));
×
983

UNCOV
984
  json = cJSON_CreateObject();
×
UNCOV
985
  RAW_NULL_CHECK(json);
×
UNCOV
986
  ADD_TO_JSON_STRING(json, "type", "drop");
×
987

UNCOV
988
  cJSON* tableNameList = cJSON_AddArrayToObject(json, "tableNameList");
×
UNCOV
989
  RAW_NULL_CHECK(tableNameList);
×
990

UNCOV
991
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
UNCOV
992
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
×
UNCOV
993
    RAW_NULL_CHECK(tmqAddStringToArray(tableNameList, pDropTbReq->name));
×
994
  }
995

UNCOV
996
end:
×
UNCOV
997
  tDecoderClear(&decoder);
×
UNCOV
998
  *pJson = json;
×
UNCOV
999
  RAW_LOG_END
×
UNCOV
1000
  return code;
×
1001
}
1002

1003
static int32_t taosCreateStb(TAOS* taos, void* meta, uint32_t metaLen) {
3,223✔
1004
  if (taos == NULL || meta == NULL) {
3,223✔
1005
    uError("invalid parameter in %s", __func__);
×
UNCOV
1006
    return TSDB_CODE_INVALID_PARA;
×
1007
  }
1008
  SVCreateStbReq req = {0};
3,223✔
1009
  SDecoder       coder = {0};
3,223✔
1010
  SMCreateStbReq pReq = {0};
3,223✔
1011
  int32_t        code = TSDB_CODE_SUCCESS;
3,223✔
1012
  int32_t        lino = 0;
3,223✔
1013
  SRequestObj*   pRequest = NULL;
3,223✔
1014
  SCmdMsgInfo    pCmdMsg = {0};
3,223✔
1015
  RAW_LOG_START
3,223✔
1016

1017
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
3,223✔
1018
  uDebug(LOG_ID_TAG " create stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
3,223✔
1019
  pRequest->syncQuery = true;
3,223✔
1020
  if (!pRequest->pDb) {
3,223✔
UNCOV
1021
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
1022
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
UNCOV
1023
    goto end;
×
1024
  }
1025
  // decode and process req
1026
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
3,223✔
1027
  uint32_t len = metaLen - sizeof(SMsgHead);
3,223✔
1028
  tDecoderInit(&coder, data, len);
3,223✔
1029
  RAW_RETURN_CHECK(tDecodeSVCreateStbReq(&coder, &req));
3,223✔
1030

1031
  int8_t           createDefaultCompress = 0;
3,223✔
1032
  SColCmprWrapper* p = &req.colCmpr;
3,223✔
1033
  if (p->nCols == 0) {
3,223✔
UNCOV
1034
    createDefaultCompress = 1;
×
1035
  }
1036
  // build create stable
1037
  pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SFieldWithOptions));
3,223✔
1038
  RAW_NULL_CHECK(pReq.pColumns);
3,223✔
1039
  for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
69,315✔
1040
    SSchema*          pSchema = req.schemaRow.pSchema + i;
66,092✔
1041
    SFieldWithOptions field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
66,092✔
1042
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
66,092✔
1043

1044
    if (createDefaultCompress) {
66,092✔
UNCOV
1045
      field.compress = createDefaultColCmprByType(pSchema->type);
×
1046
    } else {
1047
      SColCmpr* pCmp = &req.colCmpr.pColCmpr[i];
66,092✔
1048
      field.compress = pCmp->alg;
66,092✔
1049
    }
1050
    if (req.pExtSchemas) field.typeMod = req.pExtSchemas[i].typeMod;
66,092✔
1051
    RAW_NULL_CHECK(taosArrayPush(pReq.pColumns, &field));
132,184✔
1052
  }
1053
  pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
3,223✔
1054
  RAW_NULL_CHECK(pReq.pTags);
3,223✔
1055
  for (int32_t i = 0; i < req.schemaTag.nCols; i++) {
6,446✔
1056
    SSchema* pSchema = req.schemaTag.pSchema + i;
3,223✔
1057
    SField   field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
3,223✔
1058
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
3,223✔
1059
    RAW_NULL_CHECK(taosArrayPush(pReq.pTags, &field));
6,446✔
1060
  }
1061

1062
  pReq.colVer = req.schemaRow.version;
3,223✔
1063
  pReq.tagVer = req.schemaTag.version;
3,223✔
1064
  pReq.numOfColumns = req.schemaRow.nCols;
3,223✔
1065
  pReq.numOfTags = req.schemaTag.nCols;
3,223✔
1066
  pReq.commentLen = -1;
3,223✔
1067
  pReq.suid = processSuid(req.suid, pRequest->pDb);
3,223✔
1068
  pReq.source = TD_REQ_FROM_TAOX;
3,223✔
1069
  pReq.igExists = true;
3,223✔
1070
  pReq.virtualStb = req.virtualStb;
3,223✔
1071

1072
  uDebug(LOG_ID_TAG " create stable name:%s suid:%" PRId64 " processSuid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
3,223✔
1073
         pReq.suid);
1074
  STscObj* pTscObj = pRequest->pTscObj;
3,223✔
1075
  SName    tableName = {0};
3,223✔
1076
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
3,223✔
1077
  RAW_RETURN_CHECK(tNameExtractFullName(&tableName, pReq.name));
3,223✔
1078
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
3,223✔
1079
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
3,223✔
1080
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
3,223✔
1081
  RAW_FALSE_CHECK(pCmdMsg.msgLen > 0);
3,223✔
1082
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
3,223✔
1083
  RAW_NULL_CHECK(pCmdMsg.pMsg);
3,223✔
1084
  RAW_FALSE_CHECK(tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) > 0);
3,223✔
1085

1086
  SQuery pQuery = {0};
3,223✔
1087
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
3,223✔
1088
  pQuery.pCmdMsg = &pCmdMsg;
3,223✔
1089
  pQuery.msgType = pQuery.pCmdMsg->msgType;
3,223✔
1090
  pQuery.stableQuery = true;
3,223✔
1091

1092
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
3,223✔
1093

1094
  if (pRequest->code == TSDB_CODE_SUCCESS) {
3,223✔
1095
    SCatalog* pCatalog = NULL;
3,223✔
1096
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
3,223✔
1097
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
3,223✔
1098
  }
1099

1100
  code = pRequest->code;
3,223✔
1101
  uDebug(LOG_ID_TAG " create stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
3,223✔
1102

1103
end:
3,223✔
1104
  destroyRequest(pRequest);
3,223✔
1105
  tFreeSMCreateStbReq(&pReq);
3,223✔
1106
  tDecoderClear(&coder);
3,223✔
1107
  taosMemoryFree(pCmdMsg.pMsg);
3,223✔
1108
  RAW_LOG_END
3,223✔
1109
  return code;
3,223✔
1110
}
1111

UNCOV
1112
static int32_t taosDropStb(TAOS* taos, void* meta, uint32_t metaLen) {
×
UNCOV
1113
  if (taos == NULL || meta == NULL) {
×
UNCOV
1114
    uError("invalid parameter in %s", __func__);
×
UNCOV
1115
    return TSDB_CODE_INVALID_PARA;
×
1116
  }
UNCOV
1117
  SVDropStbReq req = {0};
×
UNCOV
1118
  SDecoder     coder = {0};
×
UNCOV
1119
  SMDropStbReq pReq = {0};
×
UNCOV
1120
  int32_t      code = TSDB_CODE_SUCCESS;
×
UNCOV
1121
  int32_t      lino = 0;
×
UNCOV
1122
  SRequestObj* pRequest = NULL;
×
UNCOV
1123
  SCmdMsgInfo  pCmdMsg = {0};
×
1124

UNCOV
1125
  RAW_LOG_START
×
UNCOV
1126
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
×
UNCOV
1127
  uDebug(LOG_ID_TAG " drop stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
×
UNCOV
1128
  pRequest->syncQuery = true;
×
UNCOV
1129
  if (!pRequest->pDb) {
×
UNCOV
1130
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
UNCOV
1131
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
1132
    goto end;
×
1133
  }
1134
  // decode and process req
UNCOV
1135
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
×
UNCOV
1136
  uint32_t len = metaLen - sizeof(SMsgHead);
×
UNCOV
1137
  tDecoderInit(&coder, data, len);
×
UNCOV
1138
  RAW_RETURN_CHECK(tDecodeSVDropStbReq(&coder, &req));
×
UNCOV
1139
  SCatalog* pCatalog = NULL;
×
UNCOV
1140
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
×
UNCOV
1141
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
×
UNCOV
1142
                           .requestId = pRequest->requestId,
×
UNCOV
1143
                           .requestObjRefId = pRequest->self,
×
UNCOV
1144
                           .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
×
UNCOV
1145
  SName            pName = {0};
×
UNCOV
1146
  toName(pRequest->pTscObj->acctId, pRequest->pDb, req.name, &pName);
×
UNCOV
1147
  STableMeta* pTableMeta = NULL;
×
UNCOV
1148
  code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
×
UNCOV
1149
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
×
UNCOV
1150
    uInfo(LOG_ID_TAG " stable %s not exist, ignore drop", LOG_ID_VALUE, req.name);
×
UNCOV
1151
    code = TSDB_CODE_SUCCESS;
×
UNCOV
1152
    taosMemoryFreeClear(pTableMeta);
×
UNCOV
1153
    goto end;
×
1154
  }
UNCOV
1155
  RAW_RETURN_CHECK(code);
×
UNCOV
1156
  pReq.suid = pTableMeta->uid;
×
UNCOV
1157
  taosMemoryFreeClear(pTableMeta);
×
1158

1159
  // build drop stable
UNCOV
1160
  pReq.igNotExists = true;
×
UNCOV
1161
  pReq.source = TD_REQ_FROM_TAOX;
×
1162
  //  pReq.suid = processSuid(req.suid, pRequest->pDb);
1163

UNCOV
1164
  uDebug(LOG_ID_TAG " drop stable name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
×
1165
         pReq.suid);
UNCOV
1166
  STscObj* pTscObj = pRequest->pTscObj;
×
UNCOV
1167
  SName    tableName = {0};
×
UNCOV
1168
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
×
UNCOV
1169
  RAW_RETURN_CHECK(tNameExtractFullName(&tableName, pReq.name));
×
1170

1171
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
×
UNCOV
1172
  pCmdMsg.msgType = TDMT_MND_DROP_STB;
×
UNCOV
1173
  pCmdMsg.msgLen = tSerializeSMDropStbReq(NULL, 0, &pReq);
×
UNCOV
1174
  RAW_FALSE_CHECK(pCmdMsg.msgLen > 0);
×
UNCOV
1175
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
×
UNCOV
1176
  RAW_NULL_CHECK(pCmdMsg.pMsg);
×
UNCOV
1177
  RAW_FALSE_CHECK(tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) > 0);
×
1178

UNCOV
1179
  SQuery pQuery = {0};
×
UNCOV
1180
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
×
UNCOV
1181
  pQuery.pCmdMsg = &pCmdMsg;
×
UNCOV
1182
  pQuery.msgType = pQuery.pCmdMsg->msgType;
×
UNCOV
1183
  pQuery.stableQuery = true;
×
1184

UNCOV
1185
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
×
UNCOV
1186
  if (pRequest->code == TSDB_CODE_SUCCESS) {
×
1187
    // ignore the error code
UNCOV
1188
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
×
UNCOV
1189
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
×
1190
  }
1191

UNCOV
1192
  code = pRequest->code;
×
1193
  uDebug(LOG_ID_TAG " drop stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
×
1194

1195
end:
×
1196
  RAW_LOG_END
×
UNCOV
1197
  destroyRequest(pRequest);
×
UNCOV
1198
  tDecoderClear(&coder);
×
UNCOV
1199
  return code;
×
1200
}
1201

1202
typedef struct SVgroupCreateTableBatch {
1203
  SVCreateTbBatchReq req;
1204
  SVgroupInfo        info;
1205
  char               dbName[TSDB_DB_NAME_LEN];
1206
} SVgroupCreateTableBatch;
1207

1208
static void destroyCreateTbReqBatch(void* data) {
8,280✔
1209
  if (data == NULL) {
8,280✔
1210
    uError("invalid parameter in %s", __func__);
×
1211
    return;
×
1212
  }
1213
  SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data;
8,280✔
1214
  taosArrayDestroy(pTbBatch->req.pArray);
8,280✔
1215
}
1216

1217
static const SSchema* getNormalColSchema(const STableMeta* pTableMeta, const char* pColName) {
6,480✔
1218
  for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) {
18,725✔
1219
    const SSchema* pSchema = pTableMeta->schema + i;
18,725✔
1220
    if (0 == strcmp(pColName, pSchema->name)) {
18,725✔
1221
      return pSchema;
6,480✔
1222
    }
1223
  }
1224
  return NULL;
×
1225
}
1226

1227
static STableMeta* getTableMeta(SCatalog* pCatalog, SRequestConnInfo* conn, char* dbName, char* tbName, int32_t acctId){
6,480✔
1228
  SName       sName = {0};
6,480✔
1229
  toName(acctId, dbName, tbName, &sName);
6,480✔
1230
  STableMeta* pTableMeta = NULL;
6,480✔
1231
  int32_t code = catalogGetTableMeta(pCatalog, conn, &sName, &pTableMeta);
6,480✔
1232
  if (code != 0) {
6,480✔
1233
    uError("failed to get table meta for reference table:%s.%s", dbName, tbName);
×
UNCOV
1234
    taosMemoryFreeClear(pTableMeta);
×
UNCOV
1235
    terrno = code;
×
UNCOV
1236
    return NULL;
×
1237
  }
1238
  return pTableMeta;
6,480✔
1239
}
1240

1241
static int32_t checkColRef(STableMeta* pTableMeta, char* colName, uint8_t precision, const SSchema* pSchema) {
6,118✔
1242
  int32_t code = TSDB_CODE_SUCCESS;
6,118✔
1243
  if (pTableMeta->tableInfo.precision != precision) {
6,118✔
UNCOV
1244
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN_TYPE;
×
UNCOV
1245
    uError("timestamp precision of virtual table and its reference table do not match");
×
UNCOV
1246
    goto end;
×
1247
  }
1248
  // org table cannot has composite primary key
1249
  if (pTableMeta->tableInfo.numOfColumns > 1 && pTableMeta->schema[1].flags & COL_IS_KEY) {
6,118✔
1250
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
UNCOV
1251
    uError("virtual table's column:\"%s\"'s reference can not from table with composite key", colName);
×
1252
    goto end;
×
1253
  }
1254

1255
  // org table must be child table or normal table
1256
  if (pTableMeta->tableType != TSDB_NORMAL_TABLE && pTableMeta->tableType != TSDB_CHILD_TABLE) {
6,118✔
UNCOV
1257
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
1258
    uError("virtual table's column:\"%s\"'s reference can only be normal table or child table", colName);
×
1259
    goto end;
×
1260
  }
1261

1262
  const SSchema* pRefCol = getNormalColSchema(pTableMeta, colName);
6,118✔
1263
  if (NULL == pRefCol) {
6,118✔
1264
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
1265
    uError("virtual table's column:\"%s\"'s reference column:\"%s\" not exist", pSchema->name, colName);
×
1266
    goto end;
×
1267
  }
1268

1269
  if (pRefCol->type != pSchema->type || pRefCol->bytes != pSchema->bytes) {
6,118✔
1270
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN_TYPE;
×
1271
    uError("virtual table's column:\"%s\"'s type and reference column:\"%s\"'s type not match, %d %d %d %d",
×
1272
            pSchema->name, colName, pSchema->type, pSchema->bytes, pRefCol->type, pRefCol->bytes);
UNCOV
1273
    goto end;
×
1274
  }
1275

1276
end:
6,118✔
1277
  return code;
6,118✔
1278
}
1279

1280
static int32_t checkColRefForCreate(SCatalog* pCatalog, SRequestConnInfo* conn, SColRef* pColRef, int32_t acctId, uint8_t precision, SSchema* pSchema) {
5,756✔
1281
  STableMeta* pTableMeta = getTableMeta(pCatalog, conn, pColRef->refDbName, pColRef->refTableName, acctId);
5,756✔
1282
  if (pTableMeta == NULL) {
5,756✔
UNCOV
1283
      return terrno;
×
1284
  }
1285
  int32_t code = checkColRef(pTableMeta, pColRef->refColName, precision, pSchema);
5,756✔
1286
  taosMemoryFreeClear(pTableMeta);
5,756✔
1287
  return code;
5,756✔
1288
}
1289

1290
static int32_t checkColRefForAdd(SCatalog* pCatalog, SRequestConnInfo* conn, int32_t acctId, char* dbName, char* tbName, char* colName, 
×
1291
  char* dbNameSrc, char* tbNameSrc, char* colNameSrc, int8_t type, int32_t bytes) {
UNCOV
1292
  int32_t code = 0;
×
UNCOV
1293
  STableMeta* pTableMeta = getTableMeta(pCatalog, conn, dbName, tbName, acctId);
×
UNCOV
1294
  if (pTableMeta == NULL) {
×
UNCOV
1295
    code = terrno;
×
UNCOV
1296
    goto end;
×
1297
  }
UNCOV
1298
  STableMeta* pTableMetaSrc = getTableMeta(pCatalog, conn, dbNameSrc, tbNameSrc, acctId);
×
UNCOV
1299
  if (pTableMetaSrc == NULL) {
×
UNCOV
1300
    code = terrno;
×
UNCOV
1301
    goto end;
×
1302
  }
1303

1304
  SSchema pSchema = {.type = type, .bytes = bytes};
×
1305
  tstrncpy(pSchema.name, colNameSrc, TSDB_COL_NAME_LEN);
×
UNCOV
1306
  code = checkColRef(pTableMeta, colName, pTableMetaSrc->tableInfo.precision, &pSchema);
×
1307

UNCOV
1308
end:
×
UNCOV
1309
  taosMemoryFreeClear(pTableMeta);
×
UNCOV
1310
  taosMemoryFreeClear(pTableMetaSrc);
×
UNCOV
1311
  return code;
×
1312
}
1313

1314
static int32_t checkColRefForAlter(SCatalog* pCatalog, SRequestConnInfo* conn, int32_t acctId, char* dbName, char* tbName, char* colName, 
362✔
1315
  char* dbNameSrc, char* tbNameSrc, char* colNameSrc) {
1316
  int32_t code = 0;
362✔
1317
  STableMeta* pTableMeta = getTableMeta(pCatalog, conn, dbName, tbName, acctId);
362✔
1318
  if (pTableMeta == NULL) {
362✔
UNCOV
1319
    code = terrno;
×
UNCOV
1320
    goto end;
×
1321
  }
1322
  STableMeta* pTableMetaSrc = getTableMeta(pCatalog, conn, dbNameSrc, tbNameSrc, acctId);
362✔
1323
  if (pTableMetaSrc == NULL) {
362✔
1324
    code = terrno;
×
1325
    goto end;
×
1326
  }
1327
  const SSchema* pSchema = getNormalColSchema(pTableMetaSrc, colNameSrc);
362✔
1328
  if (NULL == pSchema) {
362✔
UNCOV
1329
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
UNCOV
1330
    uError("virtual table's column:\"%s\" not exist", colNameSrc);
×
UNCOV
1331
    goto end;
×
1332
  }
1333

1334
  code = checkColRef(pTableMeta, colName, pTableMetaSrc->tableInfo.precision, pSchema);
362✔
1335

1336
end:
362✔
1337
  taosMemoryFreeClear(pTableMeta);
362✔
1338
  taosMemoryFreeClear(pTableMetaSrc);
362✔
1339
  return code;
362✔
1340
}
1341

1342
static int32_t taosCreateTable(TAOS* taos, void* meta, uint32_t metaLen) {
8,280✔
1343
  if (taos == NULL || meta == NULL) {
8,280✔
UNCOV
1344
    uError("invalid parameter in %s", __func__);
×
UNCOV
1345
    return TSDB_CODE_INVALID_PARA;
×
1346
  }
1347
  SVCreateTbBatchReq req = {0};
8,280✔
1348
  SDecoder           coder = {0};
8,280✔
1349
  int32_t            code = TSDB_CODE_SUCCESS;
8,280✔
1350
  int32_t            lino = 0;
8,280✔
1351
  SRequestObj*       pRequest = NULL;
8,280✔
1352
  SQuery*            pQuery = NULL;
8,280✔
1353
  SHashObj*          pVgroupHashmap = NULL;
8,280✔
1354

1355
  RAW_LOG_START
8,280✔
1356
  SArray* pTagList = taosArrayInit(0, POINTER_BYTES);
8,280✔
1357
  RAW_NULL_CHECK(pTagList);
8,280✔
1358
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
8,280✔
1359
  uDebug(LOG_ID_TAG " create table, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
8,280✔
1360

1361
  pRequest->syncQuery = true;
8,280✔
1362
  if (!pRequest->pDb) {
8,280✔
UNCOV
1363
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
UNCOV
1364
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
1365
    goto end;
×
1366
  }
1367
  // decode and process req
1368
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
8,280✔
1369
  uint32_t len = metaLen - sizeof(SMsgHead);
8,280✔
1370
  tDecoderInit(&coder, data, len);
8,280✔
1371
  RAW_RETURN_CHECK(tDecodeSVCreateTbBatchReq(&coder, &req));
8,280✔
1372
  STscObj* pTscObj = pRequest->pTscObj;
8,280✔
1373

1374
  SVCreateTbReq* pCreateReq = NULL;
8,280✔
1375
  SCatalog*      pCatalog = NULL;
8,280✔
1376
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
8,280✔
1377
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
8,280✔
1378
  RAW_NULL_CHECK(pVgroupHashmap);
8,280✔
1379
  taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
8,280✔
1380

1381
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
8,280✔
1382
                           .requestId = pRequest->requestId,
8,280✔
1383
                           .requestObjRefId = pRequest->self,
8,280✔
1384
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
8,280✔
1385

1386
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
8,280✔
1387
  RAW_NULL_CHECK(pRequest->tableList);
8,280✔
1388
  // loop to create table
1389
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
16,560✔
1390
    pCreateReq = req.pReqs + iReq;
8,280✔
1391

1392
    SVgroupInfo pInfo = {0};
8,280✔
1393
    SName       pName = {0};
8,280✔
1394
    toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
8,280✔
1395
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
8,280✔
1396

1397
    pCreateReq->flags |= TD_CREATE_IF_NOT_EXISTS;
8,280✔
1398
    // change tag cid to new cid
1399
    if (pCreateReq->type == TSDB_CHILD_TABLE || pCreateReq->type == TSDB_VIRTUAL_CHILD_TABLE) {
8,280✔
1400
      STableMeta* pTableMeta = NULL;
4,672✔
1401
      SName       sName = {0};
4,672✔
1402
      tb_uid_t    oldSuid = pCreateReq->ctb.suid;
4,672✔
1403
      //      pCreateReq->ctb.suid = processSuid(pCreateReq->ctb.suid, pRequest->pDb);
1404
      toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName);
4,672✔
1405
      code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta);
4,672✔
1406
      if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
4,672✔
UNCOV
1407
        uInfo(LOG_ID_TAG " super table %s not exist, ignore create child table %s", LOG_ID_VALUE,
×
1408
              pCreateReq->ctb.stbName, pCreateReq->name);
UNCOV
1409
        code = TSDB_CODE_SUCCESS;
×
UNCOV
1410
        taosMemoryFreeClear(pTableMeta);
×
UNCOV
1411
        continue;
×
1412
      }
1413

1414
      RAW_RETURN_CHECK(code);
4,672✔
1415
      pCreateReq->ctb.suid = pTableMeta->uid;
4,672✔
1416

1417
      bool changeDB = strlen(tmqWriteRefDB) > 0;
4,672✔
1418
      for (int32_t i = 0; changeDB && i < pCreateReq->colRef.nCols; i++) {
19,080✔
1419
        SColRef* pColRef = pCreateReq->colRef.pColRef + i;
14,408✔
1420
        tstrncpy(pColRef->refDbName, tmqWriteRefDB, TSDB_DB_NAME_LEN);
14,408✔
1421
      }
1422

1423
      for (int32_t i = 0; tmqWriteCheckRef && i < pCreateReq->colRef.nCols && i < pTableMeta->tableInfo.numOfColumns; i++) {
16,184✔
1424
        SColRef* pColRef = pCreateReq->colRef.pColRef + i;
11,512✔
1425
        if (!pColRef || !pColRef->hasRef) continue;
11,512✔
1426
        SSchema* pSchema = pTableMeta->schema + i;
5,756✔
1427
        RAW_RETURN_CHECK(checkColRefForCreate(pCatalog, &conn, pColRef, pTscObj->acctId, pTableMeta->tableInfo.precision, pSchema));
5,756✔
1428
      }
1429
      
1430
      SArray* pTagVals = NULL;
4,672✔
1431
      code = tTagToValArray((STag*)pCreateReq->ctb.pTag, &pTagVals);
4,672✔
1432
      if (code != TSDB_CODE_SUCCESS) {
4,672✔
1433
        uError("create tb invalid tag data %s", pCreateReq->name);
×
UNCOV
1434
        taosMemoryFreeClear(pTableMeta);
×
UNCOV
1435
        goto end;
×
1436
      }
1437

1438
      bool rebuildTag = false;
4,672✔
1439
      for (int32_t i = 0; i < taosArrayGetSize(pCreateReq->ctb.tagName); i++) {
9,344✔
1440
        char* tName = taosArrayGet(pCreateReq->ctb.tagName, i);
4,672✔
1441
        if (tName == NULL) {
4,672✔
UNCOV
1442
          continue;
×
1443
        }
1444
        for (int32_t j = pTableMeta->tableInfo.numOfColumns;
4,672✔
1445
             j < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; j++) {
9,344✔
1446
          SSchema* tag = &pTableMeta->schema[j];
4,672✔
1447
          if (strcmp(tag->name, tName) == 0 && tag->type != TSDB_DATA_TYPE_JSON) {
4,672✔
1448
            STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
4,672✔
1449
            if (pTagVal) {
4,672✔
1450
              if (pTagVal->cid != tag->colId) {
4,672✔
1451
                pTagVal->cid = tag->colId;
350✔
1452
                rebuildTag = true;
350✔
1453
              }
1454
            } else {
UNCOV
1455
              uError("create tb invalid data %s, size:%d index:%d cid:%d", pCreateReq->name,
×
1456
                     (int)taosArrayGetSize(pTagVals), i, tag->colId);
1457
            }
1458
          }
1459
        }
1460
      }
1461
      taosMemoryFreeClear(pTableMeta);
4,672✔
1462
      if (rebuildTag) {
4,672✔
1463
        STag* ppTag = NULL;
350✔
1464
        code = tTagNew(pTagVals, 1, false, &ppTag);
350✔
1465
        taosArrayDestroy(pTagVals);
350✔
1466
        pTagVals = NULL;
350✔
1467
        if (code != TSDB_CODE_SUCCESS) {
350✔
UNCOV
1468
          goto end;
×
1469
        }
1470
        if (NULL == taosArrayPush(pTagList, &ppTag)) {
350✔
UNCOV
1471
          code = terrno;
×
UNCOV
1472
          tTagFree(ppTag);
×
UNCOV
1473
          goto end;
×
1474
        }
1475
        pCreateReq->ctb.pTag = (uint8_t*)ppTag;
350✔
1476
      }
1477
      taosArrayDestroy(pTagVals);
4,672✔
1478
    }
1479
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
16,560✔
1480

1481
    SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
8,280✔
1482
    if (pTableBatch == NULL) {
8,280✔
1483
      SVgroupCreateTableBatch tBatch = {0};
8,280✔
1484
      tBatch.info = pInfo;
8,280✔
1485
      tstrncpy(tBatch.dbName, pRequest->pDb, TSDB_DB_NAME_LEN);
8,280✔
1486

1487
      tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
8,280✔
1488
      RAW_NULL_CHECK(tBatch.req.pArray);
8,280✔
1489
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pCreateReq));
16,560✔
1490
      tBatch.req.source = TD_REQ_FROM_TAOX;
8,280✔
1491
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
8,280✔
1492
    } else {  // add to the correct vgroup
UNCOV
1493
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pCreateReq));
×
1494
    }
1495
  }
1496

1497
  if (taosHashGetSize(pVgroupHashmap) == 0) {
8,280✔
UNCOV
1498
    goto end;
×
1499
  }
1500
  SArray* pBufArray = NULL;
8,280✔
1501
  RAW_RETURN_CHECK(serializeVgroupsCreateTableBatch(pVgroupHashmap, &pBufArray));
8,280✔
1502
  pQuery = NULL;
8,280✔
1503
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
8,280✔
1504
  if (TSDB_CODE_SUCCESS != code) goto end;
8,280✔
1505
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
8,280✔
1506
  pQuery->msgType = TDMT_VND_CREATE_TABLE;
8,280✔
1507
  pQuery->stableQuery = false;
8,280✔
1508
  code = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT, &pQuery->pRoot);
8,280✔
1509
  if (TSDB_CODE_SUCCESS != code) goto end;
8,280✔
1510
  RAW_NULL_CHECK(pQuery->pRoot);
8,280✔
1511

1512
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
8,280✔
1513

1514
  launchQueryImpl(pRequest, pQuery, true, NULL);
8,280✔
1515
  if (pRequest->code == TSDB_CODE_SUCCESS) {
8,280✔
1516
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
8,280✔
1517
  }
1518

1519
  code = pRequest->code;
8,280✔
1520
  uDebug(LOG_ID_TAG " create table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
8,280✔
1521

1522
end:
8,280✔
1523
  tDeleteSVCreateTbBatchReq(&req);
8,280✔
1524

1525
  taosHashCleanup(pVgroupHashmap);
8,280✔
1526
  destroyRequest(pRequest);
8,280✔
1527
  tDecoderClear(&coder);
8,280✔
1528
  qDestroyQuery(pQuery);
8,280✔
1529
  taosArrayDestroyP(pTagList, NULL);
8,280✔
1530
  RAW_LOG_END
8,280✔
1531
  return code;
8,280✔
1532
}
1533

1534
typedef struct SVgroupDropTableBatch {
1535
  SVDropTbBatchReq req;
1536
  SVgroupInfo      info;
1537
  char             dbName[TSDB_DB_NAME_LEN];
1538
} SVgroupDropTableBatch;
1539

UNCOV
1540
static void destroyDropTbReqBatch(void* data) {
×
UNCOV
1541
  if (data == NULL) {
×
UNCOV
1542
    uError("invalid parameter in %s", __func__);
×
UNCOV
1543
    return;
×
1544
  }
UNCOV
1545
  SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
×
UNCOV
1546
  taosArrayDestroy(pTbBatch->req.pArray);
×
1547
}
1548

UNCOV
1549
static int32_t taosDropTable(TAOS* taos, void* meta, uint32_t metaLen) {
×
UNCOV
1550
  if (taos == NULL || meta == NULL) {
×
UNCOV
1551
    uError("invalid parameter in %s", __func__);
×
UNCOV
1552
    return TSDB_CODE_INVALID_PARA;
×
1553
  }
UNCOV
1554
  SVDropTbBatchReq req = {0};
×
UNCOV
1555
  SDecoder         coder = {0};
×
UNCOV
1556
  int32_t          code = TSDB_CODE_SUCCESS;
×
UNCOV
1557
  int32_t          lino = 0;
×
UNCOV
1558
  SRequestObj*     pRequest = NULL;
×
UNCOV
1559
  SQuery*          pQuery = NULL;
×
UNCOV
1560
  SHashObj*        pVgroupHashmap = NULL;
×
1561

UNCOV
1562
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
×
UNCOV
1563
  uDebug(LOG_ID_TAG " drop table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
×
1564

UNCOV
1565
  pRequest->syncQuery = true;
×
UNCOV
1566
  if (!pRequest->pDb) {
×
UNCOV
1567
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
UNCOV
1568
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
1569
    goto end;
×
1570
  }
1571
  // decode and process req
UNCOV
1572
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
×
UNCOV
1573
  uint32_t len = metaLen - sizeof(SMsgHead);
×
UNCOV
1574
  tDecoderInit(&coder, data, len);
×
UNCOV
1575
  RAW_RETURN_CHECK(tDecodeSVDropTbBatchReq(&coder, &req));
×
UNCOV
1576
  STscObj* pTscObj = pRequest->pTscObj;
×
1577

UNCOV
1578
  SVDropTbReq* pDropReq = NULL;
×
UNCOV
1579
  SCatalog*    pCatalog = NULL;
×
UNCOV
1580
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
×
1581

UNCOV
1582
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
UNCOV
1583
  RAW_NULL_CHECK(pVgroupHashmap);
×
UNCOV
1584
  taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
×
1585

UNCOV
1586
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
×
UNCOV
1587
                           .requestId = pRequest->requestId,
×
UNCOV
1588
                           .requestObjRefId = pRequest->self,
×
UNCOV
1589
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
×
UNCOV
1590
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
×
UNCOV
1591
  RAW_NULL_CHECK(pRequest->tableList);
×
1592
  // loop to create table
UNCOV
1593
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
UNCOV
1594
    pDropReq = req.pReqs + iReq;
×
UNCOV
1595
    pDropReq->igNotExists = true;
×
1596
    //    pDropReq->suid = processSuid(pDropReq->suid, pRequest->pDb);
1597

UNCOV
1598
    SVgroupInfo pInfo = {0};
×
UNCOV
1599
    SName       pName = {0};
×
UNCOV
1600
    toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
×
UNCOV
1601
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
×
1602

UNCOV
1603
    STableMeta* pTableMeta = NULL;
×
UNCOV
1604
    code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
×
UNCOV
1605
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
×
UNCOV
1606
      code = TSDB_CODE_SUCCESS;
×
UNCOV
1607
      uInfo(LOG_ID_TAG " table %s not exist, ignore drop", LOG_ID_VALUE, pDropReq->name);
×
UNCOV
1608
      taosMemoryFreeClear(pTableMeta);
×
UNCOV
1609
      continue;
×
1610
    }
UNCOV
1611
    RAW_RETURN_CHECK(code);
×
UNCOV
1612
    tb_uid_t oldSuid = pDropReq->suid;
×
UNCOV
1613
    pDropReq->suid = pTableMeta->suid;
×
UNCOV
1614
    taosMemoryFreeClear(pTableMeta);
×
UNCOV
1615
    uDebug(LOG_ID_TAG " drop table name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, pDropReq->name, oldSuid,
×
1616
           pDropReq->suid);
1617

UNCOV
1618
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
×
UNCOV
1619
    SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
×
UNCOV
1620
    if (pTableBatch == NULL) {
×
UNCOV
1621
      SVgroupDropTableBatch tBatch = {0};
×
UNCOV
1622
      tBatch.info = pInfo;
×
UNCOV
1623
      tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
×
UNCOV
1624
      RAW_NULL_CHECK(tBatch.req.pArray);
×
1625
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pDropReq));
×
1626
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
×
1627
    } else {  // add to the correct vgroup
UNCOV
1628
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pDropReq));
×
1629
    }
1630
  }
1631

UNCOV
1632
  if (taosHashGetSize(pVgroupHashmap) == 0) {
×
UNCOV
1633
    goto end;
×
1634
  }
UNCOV
1635
  SArray* pBufArray = NULL;
×
UNCOV
1636
  RAW_RETURN_CHECK(serializeVgroupsDropTableBatch(pVgroupHashmap, &pBufArray));
×
UNCOV
1637
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
×
UNCOV
1638
  if (TSDB_CODE_SUCCESS != code) goto end;
×
UNCOV
1639
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
×
UNCOV
1640
  pQuery->msgType = TDMT_VND_DROP_TABLE;
×
UNCOV
1641
  pQuery->stableQuery = false;
×
UNCOV
1642
  pQuery->pRoot = NULL;
×
UNCOV
1643
  code = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT, &pQuery->pRoot);
×
UNCOV
1644
  if (TSDB_CODE_SUCCESS != code) goto end;
×
UNCOV
1645
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
×
1646

UNCOV
1647
  launchQueryImpl(pRequest, pQuery, true, NULL);
×
UNCOV
1648
  if (pRequest->code == TSDB_CODE_SUCCESS) {
×
UNCOV
1649
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
×
1650
  }
UNCOV
1651
  code = pRequest->code;
×
UNCOV
1652
  uDebug(LOG_ID_TAG " drop table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
×
1653

UNCOV
1654
end:
×
UNCOV
1655
  taosHashCleanup(pVgroupHashmap);
×
UNCOV
1656
  destroyRequest(pRequest);
×
UNCOV
1657
  tDecoderClear(&coder);
×
UNCOV
1658
  qDestroyQuery(pQuery);
×
UNCOV
1659
  RAW_LOG_END
×
UNCOV
1660
  return code;
×
1661
}
1662

UNCOV
1663
static int32_t taosDeleteData(TAOS* taos, void* meta, uint32_t metaLen) {
×
UNCOV
1664
  if (taos == NULL || meta == NULL) {
×
UNCOV
1665
    uError("invalid parameter in %s", __func__);
×
UNCOV
1666
    return TSDB_CODE_INVALID_PARA;
×
1667
  }
UNCOV
1668
  SDeleteRes req = {0};
×
UNCOV
1669
  SDecoder   coder = {0};
×
UNCOV
1670
  char       sql[256] = {0};
×
UNCOV
1671
  int32_t    code = TSDB_CODE_SUCCESS;
×
UNCOV
1672
  int32_t    lino = 0;
×
UNCOV
1673
  uDebug("connId:0x%" PRIx64 " delete data, meta:%p, len:%d", *(int64_t*)taos, meta, metaLen);
×
1674

1675
  // decode and process req
UNCOV
1676
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
×
UNCOV
1677
  uint32_t len = metaLen - sizeof(SMsgHead);
×
1678
  tDecoderInit(&coder, data, len);
×
1679
  RAW_RETURN_CHECK(tDecodeDeleteRes(&coder, &req));
×
1680
  (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
×
1681
                 req.tsColName, req.skey, req.tsColName, req.ekey);
1682

UNCOV
1683
  TAOS_RES* res = taosQueryImpl(taos, sql, false, TD_REQ_FROM_TAOX);
×
UNCOV
1684
  RAW_NULL_CHECK(res);
×
UNCOV
1685
  SRequestObj* pRequest = (SRequestObj*)res;
×
UNCOV
1686
  code = pRequest->code;
×
UNCOV
1687
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_PAR_GET_META_ERROR) {
×
UNCOV
1688
    code = TSDB_CODE_SUCCESS;
×
1689
  }
UNCOV
1690
  taos_free_result(res);
×
UNCOV
1691
  uDebug("connId:0x%" PRIx64 " delete data sql:%s, code:%s", *(int64_t*)taos, sql, tstrerror(code));
×
1692

UNCOV
1693
end:
×
UNCOV
1694
  RAW_LOG_END
×
UNCOV
1695
  tDecoderClear(&coder);
×
UNCOV
1696
  return code;
×
1697
}
1698

1699
static int32_t taosAlterTable(TAOS* taos, void* meta, uint32_t metaLen) {
2,896✔
1700
  if (taos == NULL || meta == NULL) {
2,896✔
UNCOV
1701
    uError("invalid parameter in %s", __func__);
×
UNCOV
1702
    return TSDB_CODE_INVALID_PARA;
×
1703
  }
1704
  SVAlterTbReq   req = {0};
2,896✔
1705
  SDecoder       dcoder = {0};
2,896✔
1706
  int32_t        code = TSDB_CODE_SUCCESS;
2,896✔
1707
  int32_t        lino = 0;
2,896✔
1708
  SRequestObj*   pRequest = NULL;
2,896✔
1709
  SQuery*        pQuery = NULL;
2,896✔
1710
  SArray*        pArray = NULL;
2,896✔
1711
  SVgDataBlocks* pVgData = NULL;
2,896✔
1712
  SEncoder       coder = {0};
2,896✔
1713

1714
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
2,896✔
1715
  uDebug(LOG_ID_TAG " alter table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
2,896✔
1716
  pRequest->syncQuery = true;
2,896✔
1717
  if (!pRequest->pDb) {
2,896✔
UNCOV
1718
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
UNCOV
1719
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
1720
    goto end;
×
1721
  }
1722
  // decode and process req
1723
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
2,896✔
1724
  uint32_t len = metaLen - sizeof(SMsgHead);
2,896✔
1725
  tDecoderInit(&dcoder, data, len);
2,896✔
1726
  RAW_RETURN_CHECK(tDecodeSVAlterTbReq(&dcoder, &req));
2,896✔
1727
  // do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
1728
  if (req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS) {
2,896✔
UNCOV
1729
    uInfo(LOG_ID_TAG " alter table action is UPDATE_OPTIONS, ignore", LOG_ID_VALUE);
×
UNCOV
1730
    goto end;
×
1731
  }
1732

1733
  STscObj*  pTscObj = pRequest->pTscObj;
2,896✔
1734
  SCatalog* pCatalog = NULL;
2,896✔
1735
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
2,896✔
1736
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2,896✔
1737
                           .requestId = pRequest->requestId,
2,896✔
1738
                           .requestObjRefId = pRequest->self,
2,896✔
1739
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
2,896✔
1740

1741
  SVgroupInfo pInfo = {0};
2,896✔
1742
  SName       pName = {0};
2,896✔
1743
  toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
2,896✔
1744
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
2,896✔
1745
  pArray = taosArrayInit(1, sizeof(void*));
2,896✔
1746
  RAW_NULL_CHECK(pArray);
2,896✔
1747

1748
  pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
2,896✔
1749
  RAW_NULL_CHECK(pVgData);
2,896✔
1750
  pVgData->vg = pInfo;
2,896✔
1751

1752
  int tlen = 0;
2,896✔
1753
  req.source = TD_REQ_FROM_TAOX;
2,896✔
1754

1755
  if (strlen(tmqWriteRefDB) > 0) {
2,896✔
1756
    req.refDbName = tmqWriteRefDB;
2,896✔
1757
  }
1758

1759
  if (req.action == TSDB_ALTER_TABLE_ALTER_COLUMN_REF && tmqWriteCheckRef) {
2,896✔
1760
    RAW_RETURN_CHECK(checkColRefForAlter(pCatalog, &conn, pTscObj->acctId, req.refDbName, req.refTbName, req.refColName, 
362✔
1761
      pRequest->pDb, req.tbName, req.colName));
1762
  }else if (req.action == TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COLUMN_REF && tmqWriteCheckRef) {
2,534✔
UNCOV
1763
    RAW_RETURN_CHECK(checkColRefForAdd(pCatalog, &conn, pTscObj->acctId, req.refDbName, req.refTbName, req.refColName, 
×
1764
      pRequest->pDb, req.tbName, req.colName, req.type, req.bytes));
1765
  }
1766

1767
  tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code);
2,896✔
1768
  RAW_RETURN_CHECK(code);
2,896✔
1769
  tlen += sizeof(SMsgHead);
2,896✔
1770
  void* pMsg = taosMemoryMalloc(tlen);
2,896✔
1771
  RAW_NULL_CHECK(pMsg);
2,896✔
1772
  ((SMsgHead*)pMsg)->vgId = htonl(pInfo.vgId);
2,896✔
1773
  ((SMsgHead*)pMsg)->contLen = htonl(tlen);
2,896✔
1774
  void* pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
2,896✔
1775
  tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
2,896✔
1776
  code = tEncodeSVAlterTbReq(&coder, &req);
2,896✔
1777
  RAW_RETURN_CHECK(code);
2,896✔
1778

1779
  pVgData->pData = pMsg;
2,896✔
1780
  pVgData->size = tlen;
2,896✔
1781

1782
  pVgData->numOfTables = 1;
2,896✔
1783
  RAW_NULL_CHECK(taosArrayPush(pArray, &pVgData));
2,896✔
1784

1785
  pQuery = NULL;
2,896✔
1786
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
2,896✔
1787
  if (NULL == pQuery) goto end;
2,896✔
1788
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
2,896✔
1789
  pQuery->msgType = TDMT_VND_ALTER_TABLE;
2,896✔
1790
  pQuery->stableQuery = false;
2,896✔
1791
  pQuery->pRoot = NULL;
2,896✔
1792
  code = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT, &pQuery->pRoot);
2,896✔
1793
  if (TSDB_CODE_SUCCESS != code) goto end;
2,896✔
1794
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pArray));
2,896✔
1795

1796
  launchQueryImpl(pRequest, pQuery, true, NULL);
2,896✔
1797

1798
  pVgData = NULL;
2,896✔
1799
  pArray = NULL;
2,896✔
1800
  code = pRequest->code;
2,896✔
1801
  if (code == TSDB_CODE_TDB_TABLE_NOT_EXIST) {
2,896✔
UNCOV
1802
    code = TSDB_CODE_SUCCESS;
×
1803
  }
1804

1805
  if (pRequest->code == TSDB_CODE_SUCCESS) {
2,896✔
1806
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
2,896✔
1807
    if (pRes->res != NULL) {
2,896✔
1808
      code = handleAlterTbExecRes(pRes->res, pCatalog);
2,896✔
1809
    }
1810
  }
1811
  uDebug(LOG_ID_TAG " alter table return, meta:%p, len:%d, msg:%s", LOG_ID_VALUE, meta, metaLen, tstrerror(code));
2,896✔
1812

1813
end:
2,896✔
1814
  taosArrayDestroy(pArray);
2,896✔
1815
  if (pVgData) taosMemoryFreeClear(pVgData->pData);
2,896✔
1816
  taosMemoryFreeClear(pVgData);
2,896✔
1817
  destroyRequest(pRequest);
2,896✔
1818
  tDecoderClear(&dcoder);
2,896✔
1819
  qDestroyQuery(pQuery);
2,896✔
1820
  taosArrayDestroy(req.pMultiTag);
2,896✔
1821
  tEncoderClear(&coder);
2,896✔
1822
  RAW_LOG_END
2,896✔
1823
  return code;
2,896✔
1824
}
1825

1826
int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD* fields,
291✔
1827
                                     int numFields) {
1828
  return taos_write_raw_block_with_fields_with_reqid(taos, rows, pData, tbname, fields, numFields, 0);
291✔
1829
}
1830

1831
int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname,
291✔
1832
                                                TAOS_FIELD* fields, int numFields, int64_t reqid) {
1833
  if (taos == NULL || pData == NULL || tbname == NULL) {
291✔
UNCOV
1834
    uError("invalid parameter in %s", __func__);
×
UNCOV
1835
    return TSDB_CODE_INVALID_PARA;
×
1836
  }
1837
  int32_t     code = TSDB_CODE_SUCCESS;
291✔
1838
  int32_t     lino = 0;
291✔
1839
  STableMeta* pTableMeta = NULL;
291✔
1840
  SQuery*     pQuery = NULL;
291✔
1841
  SHashObj*   pVgHash = NULL;
291✔
1842

1843
  SRequestObj* pRequest = NULL;
291✔
1844
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
291✔
1845

1846
  uDebug(LOG_ID_TAG " write raw block with field, rows:%d, pData:%p, tbname:%s, fields:%p, numFields:%d", LOG_ID_VALUE,
291✔
1847
         rows, pData, tbname, fields, numFields);
1848

1849
  pRequest->syncQuery = true;
291✔
1850
  if (!pRequest->pDb) {
291✔
UNCOV
1851
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
UNCOV
1852
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
1853
    goto end;
×
1854
  }
1855

1856
  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
291✔
1857
  tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
291✔
1858
  tstrncpy(pName.tname, tbname, sizeof(pName.tname));
291✔
1859

1860
  struct SCatalog* pCatalog = NULL;
291✔
1861
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
291✔
1862

1863
  SRequestConnInfo conn = {0};
291✔
1864
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
291✔
1865
  conn.requestId = pRequest->requestId;
291✔
1866
  conn.requestObjRefId = pRequest->self;
291✔
1867
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
291✔
1868

1869
  SVgroupInfo vgData = {0};
291✔
1870
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData));
291✔
1871
  RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
291✔
1872
  RAW_RETURN_CHECK(smlInitHandle(&pQuery));
291✔
1873
  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
291✔
1874
  RAW_NULL_CHECK(pVgHash);
291✔
1875
  RAW_RETURN_CHECK(
291✔
1876
      taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
1877
  RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0, false));
291✔
1878
  RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
291✔
1879

1880
  launchQueryImpl(pRequest, pQuery, true, NULL);
291✔
1881
  code = pRequest->code;
291✔
1882
  uDebug(LOG_ID_TAG " write raw block with field return, msg:%s", LOG_ID_VALUE, tstrerror(code));
291✔
1883

1884
end:
291✔
1885
  taosMemoryFreeClear(pTableMeta);
291✔
1886
  qDestroyQuery(pQuery);
291✔
1887
  destroyRequest(pRequest);
291✔
1888
  taosHashCleanup(pVgHash);
291✔
1889
  RAW_LOG_END
291✔
1890
  return code;
291✔
1891
}
1892

1893
int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) {
2,037✔
1894
  return taos_write_raw_block_with_reqid(taos, rows, pData, tbname, 0);
2,037✔
1895
}
1896

1897
int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname, int64_t reqid) {
2,037✔
1898
  if (taos == NULL || pData == NULL || tbname == NULL) {
2,037✔
UNCOV
1899
    return TSDB_CODE_INVALID_PARA;
×
1900
  }
1901
  int32_t     code = TSDB_CODE_SUCCESS;
2,037✔
1902
  int32_t     lino = 0;
2,037✔
1903
  STableMeta* pTableMeta = NULL;
2,037✔
1904
  SQuery*     pQuery = NULL;
2,037✔
1905
  SHashObj*   pVgHash = NULL;
2,037✔
1906

1907
  SRequestObj* pRequest = NULL;
2,037✔
1908
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
2,037✔
1909

1910
  uDebug(LOG_ID_TAG " write raw block, rows:%d, pData:%p, tbname:%s", LOG_ID_VALUE, rows, pData, tbname);
2,037✔
1911

1912
  pRequest->syncQuery = true;
2,037✔
1913
  if (!pRequest->pDb) {
2,037✔
UNCOV
1914
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
UNCOV
1915
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
1916
    goto end;
×
1917
  }
1918

1919
  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
2,037✔
1920
  tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
2,037✔
1921
  tstrncpy(pName.tname, tbname, sizeof(pName.tname));
2,037✔
1922

1923
  struct SCatalog* pCatalog = NULL;
2,037✔
1924
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
2,037✔
1925

1926
  SRequestConnInfo conn = {0};
2,037✔
1927
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
2,037✔
1928
  conn.requestId = pRequest->requestId;
2,037✔
1929
  conn.requestObjRefId = pRequest->self;
2,037✔
1930
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
2,037✔
1931

1932
  SVgroupInfo vgData = {0};
2,037✔
1933
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData));
2,037✔
1934
  RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
2,037✔
1935
  RAW_RETURN_CHECK(smlInitHandle(&pQuery));
1,746✔
1936
  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
1,746✔
1937
  RAW_NULL_CHECK(pVgHash);
1,746✔
1938
  RAW_RETURN_CHECK(
1,746✔
1939
      taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
1940
  RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false, NULL, 0, false));
1,746✔
1941
  RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
1,164✔
1942

1943
  launchQueryImpl(pRequest, pQuery, true, NULL);
1,164✔
1944
  code = pRequest->code;
1,164✔
1945
  uDebug(LOG_ID_TAG " write raw block return, msg:%s", LOG_ID_VALUE, tstrerror(code));
1,164✔
1946

1947
end:
2,037✔
1948
  taosMemoryFreeClear(pTableMeta);
2,037✔
1949
  qDestroyQuery(pQuery);
2,037✔
1950
  destroyRequest(pRequest);
2,037✔
1951
  taosHashCleanup(pVgHash);
2,037✔
1952
  RAW_LOG_END
2,037✔
1953
  return code;
2,037✔
1954
}
1955

1956
static void* getRawDataFromRes(void* pRetrieve) {
3,238✔
1957
  if (pRetrieve == NULL) {
3,238✔
UNCOV
1958
    uError("invalid parameter in %s", __func__);
×
UNCOV
1959
    return NULL;
×
1960
  }
1961
  void* rawData = NULL;
3,238✔
1962
  // deal with compatibility
1963
  if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_VERSION) {
3,238✔
UNCOV
1964
    rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
×
1965
  } else if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_VERSION ||
3,238✔
UNCOV
1966
             *(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION) {
×
1967
    rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
3,238✔
1968
  }
1969
  return rawData;
3,238✔
1970
}
1971

1972
static int32_t buildCreateTbMap(SMqDataRsp* rsp, SHashObj* pHashObj) {
362✔
1973
  if (rsp == NULL || pHashObj == NULL) {
362✔
UNCOV
1974
    uError("invalid parameter in %s", __func__);
×
UNCOV
1975
    return TSDB_CODE_INVALID_PARA;
×
1976
  }
1977
  // find schema data info
1978
  int32_t       code = 0;
362✔
1979
  int32_t       lino = 0;
362✔
1980
  SVCreateTbReq pCreateReq = {0};
362✔
1981
  SDecoder      decoderTmp = {0};
362✔
1982
  RAW_LOG_START
362✔
1983
  for (int j = 0; j < rsp->createTableNum; j++) {
1,086✔
1984
    void** dataTmp = taosArrayGet(rsp->createTableReq, j);
724✔
1985
    RAW_NULL_CHECK(dataTmp);
724✔
1986
    int32_t* lenTmp = taosArrayGet(rsp->createTableLen, j);
724✔
1987
    RAW_NULL_CHECK(lenTmp);
724✔
1988

1989
    tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
724✔
1990
    RAW_RETURN_CHECK(tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq));
724✔
1991

1992
    if (pCreateReq.type != TSDB_CHILD_TABLE) {
724✔
UNCOV
1993
      uError("invalid table type %d in %s", pCreateReq.type, __func__);
×
UNCOV
1994
      code = TSDB_CODE_INVALID_MSG;
×
UNCOV
1995
      goto end;
×
1996
    }
1997
    if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL) {
724✔
1998
      RAW_RETURN_CHECK(
724✔
1999
          taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq)));
2000
    } else {
UNCOV
2001
      tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
×
2002
    }
2003

2004
    tDecoderClear(&decoderTmp);
724✔
2005
    pCreateReq = (SVCreateTbReq){0};
724✔
2006
  }
2007

2008
end:
362✔
2009
  tDecoderClear(&decoderTmp);
362✔
2010
  tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
362✔
2011
  RAW_LOG_END
362✔
2012
  return code;
362✔
2013
}
2014

2015
typedef enum {
2016
  WRITE_RAW_INIT_START = 0,
2017
  WRITE_RAW_INIT_OK,
2018
  WRITE_RAW_INIT_FAIL,
2019
} WRITE_RAW_INIT_STATUS;
2020

2021
static SHashObj* writeRawCache = NULL;
2022
static int8_t    initFlag = 0;
2023
static int8_t    initedFlag = WRITE_RAW_INIT_START;
2024

2025
typedef struct {
2026
  SHashObj* pVgHash;
2027
  SHashObj* pNameHash;
2028
  SHashObj* pMetaHash;
2029
} rawCacheInfo;
2030

2031
typedef struct {
2032
  SVgroupInfo vgInfo;
2033
  int64_t     uid;
2034
  int64_t     suid;
2035
} tbInfo;
2036

2037
static void tmqFreeMeta(void* data) {
722✔
2038
  if (data == NULL) {
722✔
UNCOV
2039
    uError("invalid parameter in %s", __func__);
×
UNCOV
2040
    return;
×
2041
  }
2042
  STableMeta* pTableMeta = *(STableMeta**)data;
722✔
2043
  taosMemoryFree(pTableMeta);
722✔
2044
}
2045

UNCOV
2046
static void freeRawCache(void* data) {
×
UNCOV
2047
  if (data == NULL) {
×
UNCOV
2048
    uError("invalid parameter in %s", __func__);
×
UNCOV
2049
    return;
×
2050
  }
UNCOV
2051
  rawCacheInfo* pRawCache = (rawCacheInfo*)data;
×
UNCOV
2052
  taosHashCleanup(pRawCache->pMetaHash);
×
UNCOV
2053
  taosHashCleanup(pRawCache->pNameHash);
×
UNCOV
2054
  taosHashCleanup(pRawCache->pVgHash);
×
2055
}
2056

2057
static int32_t initRawCacheHash() {
1,791✔
2058
  if (writeRawCache == NULL) {
1,791✔
2059
    writeRawCache = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
1,791✔
2060
    if (writeRawCache == NULL) {
1,791✔
UNCOV
2061
      return terrno;
×
2062
    }
2063
    taosHashSetFreeFp(writeRawCache, freeRawCache);
1,791✔
2064
  }
2065
  return 0;
1,791✔
2066
}
2067

UNCOV
2068
static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW) {
×
UNCOV
2069
  if (rawData == NULL || pSW == NULL) {
×
UNCOV
2070
    return false;
×
2071
  }
UNCOV
2072
  if (pTableMeta == NULL) {
×
UNCOV
2073
    uError("invalid parameter in %s", __func__);
×
2074
    return false;
×
2075
  }
UNCOV
2076
  char* p = (char*)rawData;
×
2077
  // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
2078
  // column length |
UNCOV
2079
  p += sizeof(int32_t);
×
UNCOV
2080
  p += sizeof(int32_t);
×
UNCOV
2081
  p += sizeof(int32_t);
×
UNCOV
2082
  p += sizeof(int32_t);
×
UNCOV
2083
  p += sizeof(int32_t);
×
UNCOV
2084
  p += sizeof(uint64_t);
×
UNCOV
2085
  int8_t* fields = p;
×
2086

UNCOV
2087
  if (pSW->nCols != pTableMeta->tableInfo.numOfColumns) {
×
UNCOV
2088
    return true;
×
2089
  }
2090

UNCOV
2091
  for (int i = 0; i < pSW->nCols; i++) {
×
2092
    int j = 0;
×
2093
    for (; j < pTableMeta->tableInfo.numOfColumns; j++) {
×
2094
      SSchema*    pColSchema = &pTableMeta->schema[j];
×
2095
      SSchemaExt* pColExtSchema = &pTableMeta->schemaExt[j];
×
UNCOV
2096
      char*       fieldName = pSW->pSchema[i].name;
×
2097

UNCOV
2098
      if (strcmp(pColSchema->name, fieldName) == 0) {
×
UNCOV
2099
        if (checkSchema(pColSchema, pColExtSchema, fields, NULL, 0) != 0) {
×
2100
          return true;
×
2101
        }
2102
        break;
×
2103
      }
2104
    }
UNCOV
2105
    fields += sizeof(int8_t) + sizeof(int32_t);
×
2106

UNCOV
2107
    if (j == pTableMeta->tableInfo.numOfColumns) return true;
×
2108
  }
UNCOV
2109
  return false;
×
2110
}
2111

2112
static int32_t getRawCache(SHashObj** pVgHash, SHashObj** pNameHash, SHashObj** pMetaHash, void* key) {
1,434✔
2113
  if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || key == NULL) {
1,434✔
UNCOV
2114
    uError("invalid parameter in %s", __func__);
×
UNCOV
2115
    return TSDB_CODE_INVALID_PARA;
×
2116
  }
2117
  int32_t code = 0;
1,434✔
2118
  int32_t lino = 0;
1,434✔
2119
  RAW_LOG_START
1,434✔
2120
  void* cacheInfo = taosHashGet(writeRawCache, &key, POINTER_BYTES);
1,434✔
2121
  if (cacheInfo == NULL) {
1,434✔
2122
    *pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
1,434✔
2123
    RAW_NULL_CHECK(*pVgHash);
1,434✔
2124
    *pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
1,434✔
2125
    RAW_NULL_CHECK(*pNameHash);
1,434✔
2126
    *pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
1,434✔
2127
    RAW_NULL_CHECK(*pMetaHash);
1,434✔
2128
    taosHashSetFreeFp(*pMetaHash, tmqFreeMeta);
1,434✔
2129
    rawCacheInfo info = {*pVgHash, *pNameHash, *pMetaHash};
1,434✔
2130
    RAW_RETURN_CHECK(taosHashPut(writeRawCache, &key, POINTER_BYTES, &info, sizeof(rawCacheInfo)));
1,434✔
2131
  } else {
UNCOV
2132
    rawCacheInfo* info = (rawCacheInfo*)cacheInfo;
×
UNCOV
2133
    *pVgHash = info->pVgHash;
×
UNCOV
2134
    *pNameHash = info->pNameHash;
×
UNCOV
2135
    *pMetaHash = info->pMetaHash;
×
2136
  }
2137

2138
end:
1,434✔
2139
  if (code != 0) {
1,434✔
2140
    taosHashCleanup(*pMetaHash);
×
UNCOV
2141
    taosHashCleanup(*pNameHash);
×
UNCOV
2142
    taosHashCleanup(*pVgHash);
×
2143
  }
2144
  RAW_LOG_END
1,434✔
2145
  return code;
1,434✔
2146
}
2147

2148
static int32_t buildRawRequest(TAOS* taos, SRequestObj** pRequest, SCatalog** pCatalog, SRequestConnInfo* conn) {
1,434✔
2149
  if (taos == NULL || pRequest == NULL || pCatalog == NULL || conn == NULL) {
1,434✔
UNCOV
2150
    uError("invalid parameter in %s", __func__);
×
UNCOV
2151
    return TSDB_CODE_INVALID_PARA;
×
2152
  }
2153
  int32_t code = 0;
1,434✔
2154
  int32_t lino = 0;
1,434✔
2155
  RAW_LOG_START
1,434✔
2156
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, pRequest, 0));
1,434✔
2157
  (*pRequest)->syncQuery = true;
1,434✔
2158
  if (!(*pRequest)->pDb) {
1,434✔
UNCOV
2159
    uError("%s no database selected", __func__);
×
UNCOV
2160
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
UNCOV
2161
    goto end;
×
2162
  }
2163

2164
  RAW_RETURN_CHECK(catalogGetHandle((*pRequest)->pTscObj->pAppInfo->clusterId, pCatalog));
1,434✔
2165
  conn->pTrans = (*pRequest)->pTscObj->pAppInfo->pTransporter;
1,434✔
2166
  conn->requestId = (*pRequest)->requestId;
1,434✔
2167
  conn->requestObjRefId = (*pRequest)->self;
1,434✔
2168
  conn->mgmtEps = getEpSet_s(&(*pRequest)->pTscObj->pAppInfo->mgmtEp);
1,434✔
2169

2170
end:
1,434✔
2171
  RAW_LOG_END
1,434✔
2172
  return code;
1,434✔
2173
}
2174

2175
typedef int32_t _raw_decode_func_(SDecoder* pDecoder, SMqDataRsp* pRsp);
2176
static int32_t  decodeRawData(SDecoder* decoder, void* data, uint32_t dataLen, _raw_decode_func_ func,
1,434✔
2177
                              SMqRspObj* rspObj) {
2178
  if (decoder == NULL || data == NULL || func == NULL || rspObj == NULL) {
1,434✔
UNCOV
2179
    uError("invalid parameter in %s", __func__);
×
UNCOV
2180
    return TSDB_CODE_INVALID_PARA;
×
2181
  }
2182
  int8_t dataVersion = *(int8_t*)data;
1,434✔
2183
  if (dataVersion >= MQ_DATA_RSP_VERSION) {
1,434✔
2184
    data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
1,434✔
2185
    if (dataLen < sizeof(int8_t) + sizeof(int32_t)) {
1,434✔
UNCOV
2186
      return TSDB_CODE_INVALID_PARA;
×
2187
    }
2188
    dataLen -= sizeof(int8_t) + sizeof(int32_t);
1,434✔
2189
  }
2190

2191
  rspObj->resIter = -1;
1,434✔
2192
  tDecoderInit(decoder, data, dataLen);
1,434✔
2193
  int32_t code = func(decoder, &rspObj->dataRsp);
1,434✔
2194
  if (code != 0) {
1,434✔
UNCOV
2195
    SET_ERROR_MSG("decode mq taosx data rsp failed");
×
2196
  }
2197
  return code;
1,434✔
2198
}
2199

2200
static int32_t processCacheMeta(SHashObj* pVgHash, SHashObj* pNameHash, SHashObj* pMetaHash,
3,238✔
2201
                                SVCreateTbReq* pCreateReqDst, SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName,
2202
                                STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry) {
2203
  if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || pCatalog == NULL || conn == NULL || pName == NULL ||
3,238✔
2204
      pMeta == NULL) {
UNCOV
2205
    uError("invalid parameter in %s", __func__);
×
UNCOV
2206
    return TSDB_CODE_INVALID_PARA;
×
2207
  }
2208
  int32_t code = 0;
3,238✔
2209
  int32_t lino = 0;
3,238✔
2210
  RAW_LOG_START
3,238✔
2211
  STableMeta* pTableMeta = NULL;
3,238✔
2212
  tbInfo*     tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
3,238✔
2213
  if (tmpInfo == NULL || retry > 0) {
3,238✔
2214
    tbInfo info = {0};
3,238✔
2215

2216
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, conn, pName, &info.vgInfo));
3,238✔
2217
    if (pCreateReqDst && tmpInfo == NULL) {  // change stable name to get meta
3,238✔
2218
      tstrncpy(pName->tname, pCreateReqDst->ctb.stbName, TSDB_TABLE_NAME_LEN);
724✔
2219
    }
2220
    RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
3,238✔
2221
    info.uid = pTableMeta->uid;
3,238✔
2222
    if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
3,238✔
2223
      info.suid = pTableMeta->suid;
1,070✔
2224
    } else {
2225
      info.suid = pTableMeta->uid;
2,168✔
2226
    }
2227
    code = taosHashPut(pMetaHash, &info.suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
3,238✔
2228
    RAW_RETURN_CHECK(code);
3,238✔
2229

2230
    uDebug("put table meta to hash1, suid:%" PRId64 ", metaHashSIze:%d, nameHashSize:%d, vgHashSize:%d", info.suid,
3,238✔
2231
           taosHashGetSize(pMetaHash), taosHashGetSize(pNameHash), taosHashGetSize(pVgHash));
2232
    if (pCreateReqDst) {
3,238✔
2233
      pTableMeta->vgId = info.vgInfo.vgId;
724✔
2234
      pTableMeta->uid = pCreateReqDst->uid;
724✔
2235
      pCreateReqDst->ctb.suid = pTableMeta->suid;
724✔
2236
    }
2237

2238
    RAW_RETURN_CHECK(taosHashPut(pNameHash, pName->tname, strlen(pName->tname), &info, sizeof(tbInfo)));
3,238✔
2239
    tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
3,238✔
2240
    RAW_RETURN_CHECK(
3,238✔
2241
        taosHashPut(pVgHash, &info.vgInfo.vgId, sizeof(info.vgInfo.vgId), &info.vgInfo, sizeof(SVgroupInfo)));
2242
  }
2243

2244
  if (pTableMeta == NULL || retry > 0) {
3,238✔
UNCOV
2245
    STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, &tmpInfo->suid, LONG_BYTES);
×
UNCOV
2246
    if (pTableMetaTmp == NULL || retry > 0 || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) {
×
UNCOV
2247
      RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
×
UNCOV
2248
      code = taosHashPut(pMetaHash, &tmpInfo->suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
×
UNCOV
2249
      RAW_RETURN_CHECK(code);
×
UNCOV
2250
      uDebug("put table meta to hash2, suid:%" PRId64 ", metaHashSIze:%d, nameHashSize:%d, vgHashSize:%d",
×
2251
             tmpInfo->suid, taosHashGetSize(pMetaHash), taosHashGetSize(pNameHash), taosHashGetSize(pVgHash));
2252
    } else {
UNCOV
2253
      pTableMeta = *pTableMetaTmp;
×
UNCOV
2254
      pTableMeta->uid = tmpInfo->uid;
×
UNCOV
2255
      pTableMeta->vgId = tmpInfo->vgInfo.vgId;
×
2256
    }
2257
  }
2258
  *pMeta = pTableMeta;
3,238✔
2259
  pTableMeta = NULL;
3,238✔
2260

2261
end:
3,238✔
2262
  taosMemoryFree(pTableMeta);
3,238✔
2263
  RAW_LOG_END
3,238✔
2264
  return code;
3,238✔
2265
}
2266

2267
static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
1,072✔
2268
  if (taos == NULL || data == NULL) {
1,072✔
UNCOV
2269
    uError("invalid parameter in %s", __func__);
×
UNCOV
2270
    return TSDB_CODE_INVALID_PARA;
×
2271
  }
2272
  int32_t   code = TSDB_CODE_SUCCESS;
1,072✔
2273
  int32_t   lino = 0;
1,072✔
2274
  SQuery*   pQuery = NULL;
1,072✔
2275
  SMqRspObj rspObj = {0};
1,072✔
2276
  SDecoder  decoder = {0};
1,072✔
2277

2278
  SRequestObj*     pRequest = NULL;
1,072✔
2279
  SCatalog*        pCatalog = NULL;
1,072✔
2280
  SRequestConnInfo conn = {0};
1,072✔
2281
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
1,072✔
2282
  uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
1,072✔
2283
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
1,072✔
2284

2285
  SHashObj* pVgHash = NULL;
1,072✔
2286
  SHashObj* pNameHash = NULL;
1,072✔
2287
  SHashObj* pMetaHash = NULL;
1,072✔
2288
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
1,072✔
2289
  int retry = 0;
1,072✔
2290
  while (1) {
2291
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
1,072✔
2292
    uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
1,072✔
2293
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
3,586✔
2294
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
2,514✔
2295
      RAW_NULL_CHECK(tbName);
2,514✔
2296
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
2,514✔
2297
      RAW_NULL_CHECK(pSW);
2,514✔
2298
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
2,514✔
2299
      RAW_NULL_CHECK(pRetrieve);
2,514✔
2300
      void* rawData = getRawDataFromRes(pRetrieve);
2,514✔
2301
      RAW_NULL_CHECK(rawData);
2,514✔
2302

2303
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
2,514✔
2304
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
2,514✔
2305
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
2,514✔
2306
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
2,514✔
2307

2308
      STableMeta* pTableMeta = NULL;
2,514✔
2309
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, &pTableMeta, pSW,
2,514✔
2310
                                        rawData, retry));
2311
      char err[ERR_MSG_LEN] = {0};
2,514✔
2312
      code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
2,514✔
2313
      if (code != TSDB_CODE_SUCCESS) {
2,514✔
UNCOV
2314
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
UNCOV
2315
        goto end;
×
2316
      }
2317
    }
2318
    RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
1,072✔
2319
    launchQueryImpl(pRequest, pQuery, true, NULL);
1,072✔
2320
    code = pRequest->code;
1,072✔
2321

2322
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
1,072✔
UNCOV
2323
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
UNCOV
2324
      qDestroyQuery(pQuery);
×
UNCOV
2325
      pQuery = NULL;
×
UNCOV
2326
      rspObj.resIter = -1;
×
UNCOV
2327
      continue;
×
2328
    }
2329
    break;
1,072✔
2330
  }
2331
  uDebug(LOG_ID_TAG " write raw data return, msg:%s", LOG_ID_VALUE, tstrerror(code));
1,072✔
2332

2333
end:
1,072✔
2334
  tDeleteMqDataRsp(&rspObj.dataRsp);
1,072✔
2335
  tDecoderClear(&decoder);
1,072✔
2336
  qDestroyQuery(pQuery);
1,072✔
2337
  destroyRequest(pRequest);
1,072✔
2338
  RAW_LOG_END
1,072✔
2339
  return code;
1,072✔
2340
}
2341

2342
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
362✔
2343
  if (taos == NULL || data == NULL) {
362✔
UNCOV
2344
    uError("invalid parameter in %s", __func__);
×
UNCOV
2345
    return TSDB_CODE_INVALID_PARA;
×
2346
  }
2347
  int32_t   code = TSDB_CODE_SUCCESS;
362✔
2348
  int32_t   lino = 0;
362✔
2349
  SQuery*   pQuery = NULL;
362✔
2350
  SMqRspObj rspObj = {0};
362✔
2351
  SDecoder  decoder = {0};
362✔
2352
  SHashObj* pCreateTbHash = NULL;
362✔
2353

2354
  SRequestObj*     pRequest = NULL;
362✔
2355
  SCatalog*        pCatalog = NULL;
362✔
2356
  SRequestConnInfo conn = {0};
362✔
2357

2358
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
362✔
2359
  uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
362✔
2360
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeSTaosxRsp, &rspObj));
362✔
2361

2362
  pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
362✔
2363
  RAW_NULL_CHECK(pCreateTbHash);
362✔
2364
  RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash));
362✔
2365

2366
  SHashObj* pVgHash = NULL;
362✔
2367
  SHashObj* pNameHash = NULL;
362✔
2368
  SHashObj* pMetaHash = NULL;
362✔
2369
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
362✔
2370
  int retry = 0;
362✔
2371
  while (1) {
2372
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
362✔
2373
    uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
362✔
2374
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
1,086✔
2375
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
724✔
2376
      RAW_NULL_CHECK(tbName);
724✔
2377
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
724✔
2378
      RAW_NULL_CHECK(pSW);
724✔
2379
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
724✔
2380
      RAW_NULL_CHECK(pRetrieve);
724✔
2381
      void* rawData = getRawDataFromRes(pRetrieve);
724✔
2382
      RAW_NULL_CHECK(rawData);
724✔
2383

2384
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
724✔
2385
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
724✔
2386
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
724✔
2387
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
724✔
2388

2389
      // find schema data info
2390
      SVCreateTbReq* pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, pName.tname, strlen(pName.tname));
724✔
2391
      STableMeta*    pTableMeta = NULL;
724✔
2392
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, pCreateReqDst, pCatalog, &conn, &pName,
724✔
2393
                                        &pTableMeta, pSW, rawData, retry));
2394
      char err[ERR_MSG_LEN] = {0};
724✔
2395
      code =
2396
          rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
724✔
2397
      if (code != TSDB_CODE_SUCCESS) {
724✔
2398
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
2399
        goto end;
×
2400
      }
2401
    }
2402
    RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
362✔
2403
    launchQueryImpl(pRequest, pQuery, true, NULL);
362✔
2404
    code = pRequest->code;
362✔
2405

2406
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
362✔
UNCOV
2407
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
2408
      qDestroyQuery(pQuery);
×
2409
      pQuery = NULL;
×
2410
      rspObj.resIter = -1;
×
UNCOV
2411
      continue;
×
2412
    }
2413
    break;
362✔
2414
  }
2415
  uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
362✔
2416

2417
end:
362✔
2418
  tDeleteSTaosxRsp(&rspObj.dataRsp);
362✔
2419
  void* pIter = taosHashIterate(pCreateTbHash, NULL);
362✔
2420
  while (pIter) {
1,086✔
2421
    tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE);
724✔
2422
    pIter = taosHashIterate(pCreateTbHash, pIter);
724✔
2423
  }
2424
  taosHashCleanup(pCreateTbHash);
362✔
2425
  tDecoderClear(&decoder);
362✔
2426
  qDestroyQuery(pQuery);
362✔
2427
  destroyRequest(pRequest);
362✔
2428
  RAW_LOG_END
362✔
2429
  return code;
362✔
2430
}
2431

2432
static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
×
UNCOV
2433
  if (taos == NULL || data == NULL) {
×
2434
    uError("invalid parameter in %s", __func__);
×
2435
    return TSDB_CODE_INVALID_PARA;
×
2436
  }
2437
  int32_t   code = TSDB_CODE_SUCCESS;
×
UNCOV
2438
  int32_t   lino = 0;
×
UNCOV
2439
  SQuery*   pQuery = NULL;
×
2440
  SHashObj* pVgroupHash = NULL;
×
2441
  SMqRspObj rspObj = {0};
×
UNCOV
2442
  SDecoder  decoder = {0};
×
2443

2444
  SRequestObj*     pRequest = NULL;
×
2445
  SCatalog*        pCatalog = NULL;
×
2446
  SRequestConnInfo conn = {0};
×
2447

UNCOV
2448
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
×
UNCOV
2449
  uDebug(LOG_ID_TAG " write raw rawdata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
×
2450
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
×
2451

UNCOV
2452
  SHashObj* pVgHash = NULL;
×
2453
  SHashObj* pNameHash = NULL;
×
2454
  SHashObj* pMetaHash = NULL;
×
2455
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
×
UNCOV
2456
  int retry = 0;
×
2457
  while (1) {
×
2458
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
×
2459
    uDebug(LOG_ID_TAG " write raw rawdata block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
×
2460
    SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(pQuery)->pRoot;
×
2461
    pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
×
2462
    RAW_NULL_CHECK(pVgroupHash);
×
UNCOV
2463
    pStmt->pVgDataBlocks = taosArrayInit(8, POINTER_BYTES);
×
2464
    RAW_NULL_CHECK(pStmt->pVgDataBlocks);
×
2465

2466
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
×
UNCOV
2467
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
×
2468
      RAW_NULL_CHECK(tbName);
×
2469
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
×
2470
      RAW_NULL_CHECK(pRetrieve);
×
2471
      void* rawData = getRawDataFromRes(pRetrieve);
×
2472
      RAW_NULL_CHECK(rawData);
×
2473

2474
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
×
2475
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
×
UNCOV
2476
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
×
UNCOV
2477
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
×
2478

2479
      // find schema data info
2480
      STableMeta* pTableMeta = NULL;
×
2481
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, &pTableMeta, NULL,
×
2482
                                        NULL, retry));
UNCOV
2483
      char err[ERR_MSG_LEN] = {0};
×
UNCOV
2484
      code = rawBlockBindRawData(pVgroupHash, pStmt->pVgDataBlocks, pTableMeta, rawData);
×
UNCOV
2485
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
2486
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
UNCOV
2487
        goto end;
×
2488
      }
2489
    }
UNCOV
2490
    taosHashCleanup(pVgroupHash);
×
UNCOV
2491
    pVgroupHash = NULL;
×
2492

UNCOV
2493
    RAW_RETURN_CHECK(smlBuildOutputRaw(pQuery, pVgHash));
×
UNCOV
2494
    launchQueryImpl(pRequest, pQuery, true, NULL);
×
UNCOV
2495
    code = pRequest->code;
×
2496

UNCOV
2497
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
×
UNCOV
2498
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
UNCOV
2499
      qDestroyQuery(pQuery);
×
UNCOV
2500
      pQuery = NULL;
×
UNCOV
2501
      rspObj.resIter = -1;
×
UNCOV
2502
      continue;
×
2503
    }
UNCOV
2504
    break;
×
2505
  }
UNCOV
2506
  uDebug(LOG_ID_TAG " write raw rawdata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
×
2507

UNCOV
2508
end:
×
2509
  tDeleteMqDataRsp(&rspObj.dataRsp);
×
2510
  tDecoderClear(&decoder);
×
UNCOV
2511
  qDestroyQuery(pQuery);
×
UNCOV
2512
  taosHashCleanup(pVgroupHash);
×
UNCOV
2513
  destroyRequest(pRequest);
×
UNCOV
2514
  RAW_LOG_END
×
UNCOV
2515
  return code;
×
2516
}
2517

2518
static int32_t processSimpleMeta(SMqMetaRsp* pMetaRsp, cJSON** meta) {
19,643✔
2519
  if (pMetaRsp == NULL || meta == NULL) {
19,643✔
UNCOV
2520
    uError("invalid parameter in %s", __func__);
×
UNCOV
2521
    return TSDB_CODE_INVALID_PARA;
×
2522
  }
2523
  int32_t code = 0;
19,643✔
2524
  int32_t lino = 0;
19,643✔
2525
  RAW_LOG_START
19,643✔
2526
  if (pMetaRsp->resMsgType == TDMT_VND_CREATE_STB) {
19,643✔
2527
    RAW_RETURN_CHECK(processCreateStb(pMetaRsp, meta));
4,588✔
2528
  } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_STB) {
15,055✔
2529
    RAW_RETURN_CHECK(processAlterStb(pMetaRsp, meta));
1,884✔
2530
  } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_STB) {
13,171✔
UNCOV
2531
    RAW_RETURN_CHECK(processDropSTable(pMetaRsp, meta));
×
2532
  } else if (pMetaRsp->resMsgType == TDMT_VND_CREATE_TABLE) {
13,171✔
2533
    RAW_RETURN_CHECK(processCreateTable(pMetaRsp, meta));
10,275✔
2534
  } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_TABLE) {
2,896✔
2535
    RAW_RETURN_CHECK(processAlterTable(pMetaRsp, meta));
2,896✔
UNCOV
2536
  } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_TABLE) {
×
UNCOV
2537
    RAW_RETURN_CHECK(processDropTable(pMetaRsp, meta));
×
UNCOV
2538
  } else if (pMetaRsp->resMsgType == TDMT_VND_DELETE) {
×
UNCOV
2539
    RAW_RETURN_CHECK(processDeleteTable(pMetaRsp, meta));
×
2540
  }
2541

UNCOV
2542
end:
×
2543
  RAW_LOG_END
19,643✔
2544
  return code;
19,643✔
2545
}
2546

2547
static int32_t processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp, char** string) {
2,359✔
2548
  if (pMsgRsp == NULL || string == NULL) {
2,359✔
UNCOV
2549
    uError("invalid parameter in %s", __func__);
×
UNCOV
2550
    return TSDB_CODE_INVALID_PARA;
×
2551
  }
2552
  SDecoder        coder = {0};
2,359✔
2553
  SMqBatchMetaRsp rsp = {0};
2,359✔
2554
  int32_t         code = 0;
2,359✔
2555
  int32_t         lino = 0;
2,359✔
2556
  cJSON*          pJson = NULL;
2,359✔
2557
  tDecoderInit(&coder, pMsgRsp->pMetaBuff, pMsgRsp->metaBuffLen);
2,359✔
2558
  RAW_RETURN_CHECK(tDecodeMqBatchMetaRsp(&coder, &rsp));
2,359✔
2559

2560
  pJson = cJSON_CreateObject();
2,359✔
2561
  RAW_NULL_CHECK(pJson);
2,359✔
2562
  RAW_FALSE_CHECK(cJSON_AddStringToObject(pJson, "tmq_meta_version", TMQ_META_VERSION));
2,359✔
2563
  cJSON* pMetaArr = cJSON_AddArrayToObject(pJson, "metas");
2,359✔
2564
  RAW_NULL_CHECK(pMetaArr);
2,359✔
2565

2566
  int32_t num = taosArrayGetSize(rsp.batchMetaReq);
2,359✔
2567
  for (int32_t i = 0; i < num; i++) {
10,822✔
2568
    int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
8,463✔
2569
    RAW_NULL_CHECK(len);
8,463✔
2570
    void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
8,463✔
2571
    RAW_NULL_CHECK(tmpBuf);
8,463✔
2572
    SDecoder   metaCoder = {0};
8,463✔
2573
    SMqMetaRsp metaRsp = {0};
8,463✔
2574
    tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead));
8,463✔
2575
    RAW_RETURN_CHECK(tDecodeMqMetaRsp(&metaCoder, &metaRsp));
8,463✔
2576
    cJSON* pItem = NULL;
8,463✔
2577
    RAW_RETURN_CHECK(processSimpleMeta(&metaRsp, &pItem));
8,463✔
2578
    tDeleteMqMetaRsp(&metaRsp);
8,463✔
2579
    if (pItem != NULL) RAW_FALSE_CHECK(cJSON_AddItemToArray(pMetaArr, pItem));
8,463✔
2580
  }
2581

2582
  char* fullStr = cJSON_PrintUnformatted(pJson);
2,359✔
2583
  *string = fullStr;
2,359✔
2584

2585
end:
2,359✔
2586
  cJSON_Delete(pJson);
2,359✔
2587
  tDeleteMqBatchMetaRsp(&rsp);
2,359✔
2588
  RAW_LOG_END
2,359✔
2589
  return code;
2,359✔
2590
}
2591

2592
char* tmq_get_json_meta(TAOS_RES* res) {
14,592✔
2593
  int32_t code = TSDB_CODE_SUCCESS;
14,592✔
2594
  int32_t lino = 0;
14,592✔
2595
  char*   string = NULL;
14,592✔
2596
  RAW_LOG_START
14,592✔
2597
  RAW_NULL_CHECK(res);
14,592✔
2598
  RAW_FALSE_CHECK(TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res));
14,592✔
2599

2600
  SMqRspObj* rspObj = (SMqRspObj*)res;
14,592✔
2601
  if (TD_RES_TMQ_METADATA(res)) {
14,592✔
2602
    RAW_RETURN_CHECK(processAutoCreateTable(&rspObj->dataRsp, &string));
1,053✔
2603
  } else if (TD_RES_TMQ_BATCH_META(res)) {
13,539✔
2604
    RAW_RETURN_CHECK(processBatchMetaToJson(&rspObj->batchMetaRsp, &string));
2,359✔
2605
  } else if (TD_RES_TMQ_META(res)) {
11,180✔
2606
    cJSON* pJson = NULL;
11,180✔
2607
    RAW_RETURN_CHECK(processSimpleMeta(&rspObj->metaRsp, &pJson));
11,180✔
2608
    string = cJSON_PrintUnformatted(pJson);
11,180✔
2609
    cJSON_Delete(pJson);
11,180✔
2610
  } else {
UNCOV
2611
    uError("tmq_get_json_meta res:%d, invalid type", *(int8_t*)res);
×
2612
  }
2613

2614
  uDebug("tmq_get_json_meta string:%s", string);
14,592✔
2615

2616
end:
14,592✔
2617
  RAW_LOG_END
14,592✔
2618
  return string;
14,592✔
2619
}
2620

2621
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
14,592✔
2622

2623
static int32_t getOffSetLen(const SMqDataRsp* pRsp) {
1,482✔
2624
  if (pRsp == NULL) {
1,482✔
UNCOV
2625
    uError("invalid parameter in %s", __func__);
×
UNCOV
2626
    return TSDB_CODE_INVALID_PARA;
×
2627
  }
2628
  int32_t pos = 0;
1,482✔
2629
  int32_t code = 0;
1,482✔
2630
  int32_t lino = 0;
1,482✔
2631
  RAW_LOG_START
1,482✔
2632
  SEncoder coder = {0};
1,482✔
2633
  tEncoderInit(&coder, NULL, 0);
1,482✔
2634
  RAW_RETURN_CHECK(tEncodeSTqOffsetVal(&coder, &pRsp->reqOffset));
1,482✔
2635
  RAW_RETURN_CHECK(tEncodeSTqOffsetVal(&coder, &pRsp->rspOffset));
1,482✔
2636
  pos = coder.pos;
1,482✔
2637
  tEncoderClear(&coder);
1,482✔
2638

2639
end:
1,482✔
2640
  if (code != 0) {
1,482✔
UNCOV
2641
    uError("getOffSetLen failed, code:%d", code);
×
UNCOV
2642
    return code;
×
2643
  } else {
2644
    uDebug("getOffSetLen success, len:%d", pos);
1,482✔
2645
    return pos;
1,482✔
2646
  }
2647
}
2648

2649
typedef int32_t __encode_func__(SEncoder* pEncoder, const SMqDataRsp* pRsp);
2650
static int32_t  encodeMqDataRsp(__encode_func__* encodeFunc, SMqDataRsp* rspObj, tmq_raw_data* raw) {
1,482✔
2651
  if (raw == NULL || encodeFunc == NULL || rspObj == NULL) {
1,482✔
UNCOV
2652
    uError("invalid parameter in %s", __func__);
×
UNCOV
2653
    return TSDB_CODE_INVALID_PARA;
×
2654
  }
2655
  uint32_t len = 0;
1,482✔
2656
  int32_t  code = 0;
1,482✔
2657
  int32_t  lino = 0;
1,482✔
2658
  SEncoder encoder = {0};
1,482✔
2659
  void*    buf = NULL;
1,482✔
2660
  tEncodeSize(encodeFunc, rspObj, len, code);
1,482✔
2661
  RAW_FALSE_CHECK(code >= 0);
1,482✔
2662
  len += sizeof(int8_t) + sizeof(int32_t);
1,482✔
2663
  buf = taosMemoryCalloc(1, len);
1,482✔
2664
  RAW_NULL_CHECK(buf);
1,482✔
2665
  tEncoderInit(&encoder, buf, len);
1,482✔
2666
  RAW_RETURN_CHECK(tEncodeI8(&encoder, MQ_DATA_RSP_VERSION));
1,482✔
2667
  int32_t offsetLen = getOffSetLen(rspObj);
1,482✔
2668
  RAW_FALSE_CHECK(offsetLen > 0);
1,482✔
2669
  RAW_RETURN_CHECK(tEncodeI32(&encoder, offsetLen));
1,482✔
2670
  RAW_RETURN_CHECK(encodeFunc(&encoder, rspObj));
1,482✔
2671

2672
  raw->raw = buf;
1,482✔
2673
  buf = NULL;
1,482✔
2674
  raw->raw_len = len;
1,482✔
2675

2676
end:
1,482✔
2677
  RAW_LOG_END
1,482✔
2678
  return code;
1,482✔
2679
}
2680

2681
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
9,777✔
2682
  if (raw == NULL || res == NULL) {
9,777✔
UNCOV
2683
    uError("invalid parameter in %s", __func__);
×
UNCOV
2684
    return TSDB_CODE_INVALID_PARA;
×
2685
  }
2686
  int32_t code = TSDB_CODE_SUCCESS;
9,777✔
2687
  int32_t lino = 0;
9,777✔
2688
  RAW_LOG_START
9,777✔
2689
  *raw = (tmq_raw_data){0};
9,777✔
2690
  SMqRspObj* rspObj = ((SMqRspObj*)res);
9,777✔
2691
  if (TD_RES_TMQ_META(res)) {
9,777✔
2692
    raw->raw = rspObj->metaRsp.metaRsp;
6,878✔
2693
    raw->raw_len = rspObj->metaRsp.metaRspLen >= 0 ? rspObj->metaRsp.metaRspLen : 0;
6,878✔
2694
    raw->raw_type = rspObj->metaRsp.resMsgType;
6,878✔
2695
    uDebug("tmq get raw type meta:%p", raw);
6,878✔
2696
  } else if (TD_RES_TMQ(res)) {
2,899✔
2697
    RAW_RETURN_CHECK(encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->dataRsp, raw));
1,120✔
2698
    raw->raw_type = RES_TYPE__TMQ;
1,120✔
2699
    uDebug("tmq get raw type data:%p", raw);
1,120✔
2700
  } else if (TD_RES_TMQ_METADATA(res)) {
1,779✔
2701
    RAW_RETURN_CHECK(encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->dataRsp, raw));
362✔
2702
    raw->raw_type = RES_TYPE__TMQ_METADATA;
362✔
2703
    uDebug("tmq get raw type metadata:%p", raw);
362✔
2704
  } else if (TD_RES_TMQ_BATCH_META(res)) {
1,417✔
2705
    raw->raw = rspObj->batchMetaRsp.pMetaBuff;
1,417✔
2706
    raw->raw_len = rspObj->batchMetaRsp.metaBuffLen;
1,417✔
2707
    raw->raw_type = rspObj->resType;
1,417✔
2708
    uDebug("tmq get raw batch meta:%p", raw);
1,417✔
UNCOV
2709
  } else if (TD_RES_TMQ_RAW(res)) {
×
2710
    raw->raw = rspObj->dataRsp.rawData;
×
UNCOV
2711
    rspObj->dataRsp.rawData = NULL;
×
UNCOV
2712
    raw->raw_len = rspObj->dataRsp.len;
×
UNCOV
2713
    raw->raw_type = rspObj->resType;
×
UNCOV
2714
    uDebug("tmq get raw raw:%p", raw);
×
2715
  } else {
UNCOV
2716
    uError("tmq get raw error type:%d", *(int8_t*)res);
×
2717
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
2718
  }
2719

2720
end:
9,777✔
2721
  RAW_LOG_END
9,777✔
2722
  return code;
9,777✔
2723
}
2724

2725
void tmq_free_raw(tmq_raw_data raw) {
9,729✔
2726
  uDebug("tmq free raw data type:%d", raw.raw_type);
9,729✔
2727
  if (raw.raw_type == RES_TYPE__TMQ || raw.raw_type == RES_TYPE__TMQ_METADATA) {
9,729✔
2728
    taosMemoryFree(raw.raw);
1,434✔
2729
  } else if (raw.raw_type == RES_TYPE__TMQ_RAWDATA && raw.raw != NULL) {
8,295✔
UNCOV
2730
    taosMemoryFree(POINTER_SHIFT(raw.raw, -sizeof(SMqRspHead)));
×
2731
  }
2732
  (void)memset(terrMsg, 0, ERR_MSG_LEN);
9,729✔
2733
}
9,729✔
2734

2735
static int32_t writeRawInit() {
17,250✔
2736
  while (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_START) {
19,041✔
2737
    int8_t old = atomic_val_compare_exchange_8(&initFlag, 0, 1);
1,791✔
2738
    if (old == 0) {
1,791✔
2739
      int32_t code = initRawCacheHash();
1,791✔
2740
      if (code != 0) {
1,791✔
2741
        uError("tmq writeRawImpl init error:%d", code);
×
UNCOV
2742
        atomic_store_8(&initedFlag, WRITE_RAW_INIT_FAIL);
×
UNCOV
2743
        return code;
×
2744
      }
2745
      atomic_store_8(&initedFlag, WRITE_RAW_INIT_OK);
1,791✔
2746
    }
2747
  }
2748

2749
  if (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_FAIL) {
17,250✔
UNCOV
2750
    return TSDB_CODE_INTERNAL_ERROR;
×
2751
  }
2752
  return 0;
17,250✔
2753
}
2754

2755
static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) {
17,250✔
2756
  if (taos == NULL || buf == NULL) {
17,250✔
UNCOV
2757
    uError("invalid parameter in %s", __func__);
×
UNCOV
2758
    return TSDB_CODE_INVALID_PARA;
×
2759
  }
2760
  if (writeRawInit() != 0) {
17,250✔
UNCOV
2761
    return TSDB_CODE_INTERNAL_ERROR;
×
2762
  }
2763

2764
  if (type == TDMT_VND_CREATE_STB) {
17,250✔
2765
    return taosCreateStb(taos, buf, len);
3,223✔
2766
  } else if (type == TDMT_VND_ALTER_STB) {
14,027✔
UNCOV
2767
    return taosCreateStb(taos, buf, len);
×
2768
  } else if (type == TDMT_VND_DROP_STB) {
14,027✔
UNCOV
2769
    return taosDropStb(taos, buf, len);
×
2770
  } else if (type == TDMT_VND_CREATE_TABLE) {
14,027✔
2771
    return taosCreateTable(taos, buf, len);
8,280✔
2772
  } else if (type == TDMT_VND_ALTER_TABLE) {
5,747✔
2773
    return taosAlterTable(taos, buf, len);
2,896✔
2774
  } else if (type == TDMT_VND_DROP_TABLE) {
2,851✔
UNCOV
2775
    return taosDropTable(taos, buf, len);
×
2776
  } else if (type == TDMT_VND_DELETE) {
2,851✔
UNCOV
2777
    return taosDeleteData(taos, buf, len);
×
2778
  } else if (type == RES_TYPE__TMQ_METADATA) {
2,851✔
2779
    return tmqWriteRawMetaDataImpl(taos, buf, len);
362✔
2780
  } else if (type == RES_TYPE__TMQ_RAWDATA) {
2,489✔
UNCOV
2781
    return tmqWriteRawRawDataImpl(taos, buf, len);
×
2782
  } else if (type == RES_TYPE__TMQ) {
2,489✔
2783
    return tmqWriteRawDataImpl(taos, buf, len);
1,072✔
2784
  } else if (type == RES_TYPE__TMQ_BATCH_META) {
1,417✔
2785
    return tmqWriteBatchMetaDataImpl(taos, buf, len);
1,417✔
2786
  }
2787
  return TSDB_CODE_INVALID_PARA;
×
2788
}
2789

2790
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
9,729✔
2791
  if (taos == NULL || raw.raw == NULL || raw.raw_len <= 0) {
9,729✔
UNCOV
2792
    SET_ERROR_MSG("taos:%p or data:%p is NULL or raw_len <= 0", taos, raw.raw);
×
UNCOV
2793
    return TSDB_CODE_INVALID_PARA;
×
2794
  }
2795
  taosClearErrMsg();  // clear global error message
9,729✔
2796

2797
  return writeRawImpl(taos, raw.raw, raw.raw_len, raw.raw_type);
9,729✔
2798
}
2799

2800
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen) {
1,417✔
2801
  if (taos == NULL || meta == NULL) {
1,417✔
UNCOV
2802
    uError("invalid parameter in %s", __func__);
×
UNCOV
2803
    return TSDB_CODE_INVALID_PARA;
×
2804
  }
2805
  SMqBatchMetaRsp rsp = {0};
1,417✔
2806
  SDecoder        coder = {0};
1,417✔
2807
  int32_t         code = TSDB_CODE_SUCCESS;
1,417✔
2808
  int32_t         lino = 0;
1,417✔
2809

2810
  RAW_LOG_START
1,417✔
2811
  // decode and process req
2812
  tDecoderInit(&coder, meta, metaLen);
1,417✔
2813
  RAW_RETURN_CHECK(tDecodeMqBatchMetaRsp(&coder, &rsp));
1,417✔
2814
  int32_t num = taosArrayGetSize(rsp.batchMetaReq);
1,417✔
2815
  for (int32_t i = 0; i < num; i++) {
8,938✔
2816
    int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
7,521✔
2817
    RAW_NULL_CHECK(len);
7,521✔
2818
    void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
7,521✔
2819
    RAW_NULL_CHECK(tmpBuf);
7,521✔
2820
    SDecoder   metaCoder = {0};
7,521✔
2821
    SMqMetaRsp metaRsp = {0};
7,521✔
2822
    tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead));
7,521✔
2823
    RAW_RETURN_CHECK(tDecodeMqMetaRsp(&metaCoder, &metaRsp));
7,521✔
2824
    code = writeRawImpl(taos, metaRsp.metaRsp, metaRsp.metaRspLen, metaRsp.resMsgType);
7,521✔
2825
    tDeleteMqMetaRsp(&metaRsp);
7,521✔
2826
    if (code != TSDB_CODE_SUCCESS) {
7,521✔
UNCOV
2827
      goto end;
×
2828
    }
2829
  }
2830

2831
end:
1,417✔
2832
  tDeleteMqBatchMetaRsp(&rsp);
1,417✔
2833
  RAW_LOG_END
1,417✔
2834
  return code;
1,417✔
2835
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc