• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4918

08 Jan 2026 11:50AM UTC coverage: 65.916% (+0.5%) from 65.42%
#4918

push

travis-ci

web-flow
merge: from main to 3.0 branch #34215

16 of 22 new or added lines in 3 files covered. (72.73%)

788 existing lines in 116 files now uncovered.

204221 of 309822 relevant lines covered (65.92%)

126504701.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.99
/source/client/src/clientRawBlockWrite.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "parser.h"
19
#include "tcol.h"
20
#include "tcompression.h"
21
#include "tdatablock.h"
22
#include "tdef.h"
23
#include "tglobal.h"
24
#include "tmsgtype.h"
25

26
#define RAW_LOG_END                                                           \
27
  if (code != 0) {                                                            \
28
    uError("%s failed at line:%d since:%s", __func__, lino, tstrerror(code)); \
29
  } else {                                                                    \
30
    uDebug("%s return success", __func__);                                    \
31
  }
32

33
#define RAW_LOG_START uDebug("%s start", __func__);
34

35
#define RAW_NULL_CHECK(c) \
36
  do {                    \
37
    if (c == NULL) {      \
38
      lino = __LINE__;    \
39
      code = terrno;      \
40
      goto end;           \
41
    }                     \
42
  } while (0)
43

44
#define RAW_FALSE_CHECK(c)           \
45
  do {                               \
46
    if (!(c)) {                      \
47
      code = TSDB_CODE_INVALID_PARA; \
48
      lino = __LINE__;               \
49
      goto end;                      \
50
    }                                \
51
  } while (0)
52

53
#define RAW_RETURN_CHECK(c) \
54
  do {                      \
55
    code = c;               \
56
    if (code != 0) {        \
57
      lino = __LINE__;      \
58
      goto end;             \
59
    }                       \
60
  } while (0)
61

62
#define LOG_ID_TAG   "connId:0x%" PRIx64 ", QID:0x%" PRIx64
63
#define LOG_ID_VALUE *(int64_t*)taos, pRequest->requestId
64

65
#define TMQ_META_VERSION "1.0"
66

67
static bool tmqAddJsonObjectItem(cJSON* object, const char* string, cJSON* item) {
1,759,362✔
68
  bool ret = cJSON_AddItemToObject(object, string, item);
1,759,362✔
69
  if (!ret) {
1,759,362✔
70
    cJSON_Delete(item);
×
71
  }
72
  return ret;
1,759,362✔
73
}
74
static bool tmqAddJsonArrayItem(cJSON* array, cJSON* item) {
322,259✔
75
  bool ret = cJSON_AddItemToArray(array, item);
322,259✔
76
  if (!ret) {
322,259✔
77
    cJSON_Delete(item);
×
78
  }
79
  return ret;
322,259✔
80
}
81

82
static int32_t  tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen);
83
static tb_uid_t processSuid(tb_uid_t suid, char* db) {
30,727✔
84
  if (db == NULL) {
30,727✔
85
    return suid;
×
86
  }
87
  return suid + MurmurHash3_32(db, strlen(db));
30,727✔
88
}
89
static int32_t buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, SExtSchema* pExtSchemas,
26,334✔
90
                                    char* name, int64_t id, int8_t t, SColCmprWrapper* pColCmprRow, cJSON** pJson) {
91
  if (schemaRow == NULL || name == NULL || pColCmprRow == NULL || pJson == NULL) {
26,334✔
92
    uError("invalid parameter, schemaRow:%p, name:%p, pColCmprRow:%p, pJson:%p", schemaRow, name, pColCmprRow, pJson);
×
93
    return TSDB_CODE_INVALID_PARA;
×
94
  }
95
  int32_t code = TSDB_CODE_SUCCESS;
26,334✔
96
  int32_t lino = 0;
26,334✔
97
  int8_t  buildDefaultCompress = 0;
26,334✔
98
  if (pColCmprRow->nCols <= 0) {
26,334✔
UNCOV
99
    buildDefaultCompress = 1;
×
100
  }
101
  RAW_LOG_START
26,334✔
102
  char*  string = NULL;
26,334✔
103
  cJSON* json = cJSON_CreateObject();
26,334✔
104
  RAW_NULL_CHECK(json);
26,334✔
105
  cJSON* type = cJSON_CreateString("create");
26,334✔
106
  RAW_NULL_CHECK(type);
26,334✔
107

108
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
26,334✔
109
  cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super");
26,334✔
110
  RAW_NULL_CHECK(tableType);
26,334✔
111
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
26,334✔
112
  cJSON* tableName = cJSON_CreateString(name);
26,334✔
113
  RAW_NULL_CHECK(tableName);
26,334✔
114
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
26,334✔
115

116
  cJSON* columns = cJSON_CreateArray();
26,334✔
117
  RAW_NULL_CHECK(columns);
26,334✔
118
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "columns", columns));
26,334✔
119

120
  for (int i = 0; i < schemaRow->nCols; i++) {
165,213✔
121
    cJSON* column = cJSON_CreateObject();
138,879✔
122
    RAW_NULL_CHECK(column);
138,879✔
123
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(columns, column));
138,879✔
124
    SSchema* s = schemaRow->pSchema + i;
138,879✔
125
    cJSON*   cname = cJSON_CreateString(s->name);
138,879✔
126
    RAW_NULL_CHECK(cname);
138,879✔
127
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "name", cname));
138,879✔
128
    cJSON* ctype = cJSON_CreateNumber(s->type);
138,879✔
129
    RAW_NULL_CHECK(ctype);
138,879✔
130
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "type", ctype));
138,879✔
131
    if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) {
153,910✔
132
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
15,031✔
133
      cJSON*  cbytes = cJSON_CreateNumber(length);
15,031✔
134
      RAW_NULL_CHECK(cbytes);
15,031✔
135
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "length", cbytes));
15,031✔
136
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
123,848✔
137
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
2,159✔
138
      cJSON*  cbytes = cJSON_CreateNumber(length);
2,159✔
139
      RAW_NULL_CHECK(cbytes);
2,159✔
140
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "length", cbytes));
2,159✔
141
    } else if (IS_STR_DATA_BLOB(s->type)) {
121,689✔
142
      int32_t length = s->bytes - BLOBSTR_HEADER_SIZE;
×
143
      cJSON*  cbytes = cJSON_CreateNumber(length);
×
144
      RAW_NULL_CHECK(cbytes);
×
145
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "length", cbytes));
×
146
    } else if (s->type == TSDB_DATA_TYPE_DECIMAL || s->type == TSDB_DATA_TYPE_DECIMAL64) {
121,689✔
147
      int32_t length = pExtSchemas[i].typeMod;
330✔
148
      cJSON*  cbytes = cJSON_CreateNumber(length);
330✔
149
      RAW_NULL_CHECK(cbytes);
330✔
150
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "length", cbytes));
330✔
151
    }
152
    cJSON* isPk = cJSON_CreateBool(s->flags & COL_IS_KEY);
138,879✔
153
    RAW_NULL_CHECK(isPk);
138,879✔
154
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "isPrimarykey", isPk));
138,879✔
155

156
    if (pColCmprRow == NULL) {
138,879✔
157
      continue;
×
158
    }
159

160
    uint32_t alg = 0;
138,879✔
161
    if (buildDefaultCompress) {
138,879✔
UNCOV
162
      alg = createDefaultColCmprByType(s->type);
×
163
    } else {
164
      SColCmpr* pColCmpr = pColCmprRow->pColCmpr + i;
138,879✔
165
      alg = pColCmpr->alg;
138,879✔
166
    }
167
    const char* encode = columnEncodeStr(COMPRESS_L1_TYPE_U32(alg));
138,879✔
168
    RAW_NULL_CHECK(encode);
138,879✔
169
    const char* compress = columnCompressStr(COMPRESS_L2_TYPE_U32(alg));
138,879✔
170
    RAW_NULL_CHECK(compress);
138,879✔
171
    const char* level = columnLevelStr(COMPRESS_L2_TYPE_LEVEL_U32(alg));
138,879✔
172
    RAW_NULL_CHECK(level);
138,879✔
173

174
    cJSON* encodeJson = cJSON_CreateString(encode);
138,879✔
175
    RAW_NULL_CHECK(encodeJson);
138,879✔
176
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "encode", encodeJson));
138,879✔
177

178
    cJSON* compressJson = cJSON_CreateString(compress);
138,879✔
179
    RAW_NULL_CHECK(compressJson);
138,879✔
180
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "compress", compressJson));
138,879✔
181

182
    cJSON* levelJson = cJSON_CreateString(level);
138,879✔
183
    RAW_NULL_CHECK(levelJson);
138,879✔
184
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "level", levelJson));
138,879✔
185
  }
186

187
  cJSON* tags = cJSON_CreateArray();
26,334✔
188
  RAW_NULL_CHECK(tags);
26,334✔
189
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags));
26,334✔
190

191
  for (int i = 0; schemaTag && i < schemaTag->nCols; i++) {
78,350✔
192
    cJSON* tag = cJSON_CreateObject();
52,016✔
193
    RAW_NULL_CHECK(tag);
52,016✔
194
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag));
52,016✔
195
    SSchema* s = schemaTag->pSchema + i;
52,016✔
196
    cJSON*   tname = cJSON_CreateString(s->name);
52,016✔
197
    RAW_NULL_CHECK(tname);
52,016✔
198
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname));
52,016✔
199
    cJSON* ttype = cJSON_CreateNumber(s->type);
52,016✔
200
    RAW_NULL_CHECK(ttype);
52,016✔
201
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype));
52,016✔
202
    if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) {
54,911✔
203
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
2,895✔
204
      cJSON*  cbytes = cJSON_CreateNumber(length);
2,895✔
205
      RAW_NULL_CHECK(cbytes);
2,895✔
206
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "length", cbytes));
2,895✔
207
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
49,121✔
208
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
15,071✔
209
      cJSON*  cbytes = cJSON_CreateNumber(length);
15,071✔
210
      RAW_NULL_CHECK(cbytes);
15,071✔
211
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "length", cbytes));
15,071✔
212
    } else if (IS_STR_DATA_BLOB(s->type)) {
34,050✔
213
      int32_t length = s->bytes - BLOBSTR_HEADER_SIZE;
×
214
      cJSON*  cbytes = cJSON_CreateNumber(length);
×
215
      RAW_NULL_CHECK(cbytes);
×
216
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "length", cbytes));
×
217
    }
218
  }
219

220
end:
26,334✔
221
  *pJson = json;
26,334✔
222
  RAW_LOG_END
26,334✔
223
  return code;
26,334✔
224
}
225

226
static int32_t setCompressOption(cJSON* json, uint32_t para) {
×
227
  if (json == NULL) {
×
228
    return TSDB_CODE_INVALID_PARA;
×
229
  }
230
  uint8_t encode = COMPRESS_L1_TYPE_U32(para);
×
231
  int32_t code = 0;
×
232
  int32_t lino = 0;
×
233
  RAW_LOG_START
×
234
  if (encode != 0) {
×
235
    const char* encodeStr = columnEncodeStr(encode);
×
236
    RAW_NULL_CHECK(encodeStr);
×
237
    cJSON* encodeJson = cJSON_CreateString(encodeStr);
×
238
    RAW_NULL_CHECK(encodeJson);
×
239
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "encode", encodeJson));
×
240
    goto end;
×
241
  }
242
  uint8_t compress = COMPRESS_L2_TYPE_U32(para);
×
243
  if (compress != 0) {
×
244
    const char* compressStr = columnCompressStr(compress);
×
245
    RAW_NULL_CHECK(compressStr);
×
246
    cJSON* compressJson = cJSON_CreateString(compressStr);
×
247
    RAW_NULL_CHECK(compressJson);
×
248
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "compress", compressJson));
×
249
    goto end;
×
250
  }
251
  uint8_t level = COMPRESS_L2_TYPE_LEVEL_U32(para);
×
252
  if (level != 0) {
×
253
    const char* levelStr = columnLevelStr(level);
×
254
    RAW_NULL_CHECK(levelStr);
×
255
    cJSON* levelJson = cJSON_CreateString(levelStr);
×
256
    RAW_NULL_CHECK(levelJson);
×
257
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "level", levelJson));
×
258
    goto end;
×
259
  }
260

261
end:
×
262
  RAW_LOG_END
×
263
  return code;
×
264
}
265
static int32_t buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON** pJson) {
12,503✔
266
  if (alterData == NULL || pJson == NULL) {
12,503✔
267
    uError("invalid parameter in %s alterData:%p", __func__, alterData);
×
268
    return TSDB_CODE_INVALID_PARA;
×
269
  }
270
  SMAlterStbReq req = {0};
12,503✔
271
  cJSON*        json = NULL;
12,503✔
272
  char*         string = NULL;
12,503✔
273
  int32_t       code = 0;
12,503✔
274
  int32_t       lino = 0;
12,503✔
275
  RAW_LOG_START
12,503✔
276
  RAW_RETURN_CHECK(tDeserializeSMAlterStbReq(alterData, alterDataLen, &req));
12,503✔
277
  json = cJSON_CreateObject();
12,503✔
278
  RAW_NULL_CHECK(json);
12,503✔
279
  cJSON* type = cJSON_CreateString("alter");
12,503✔
280
  RAW_NULL_CHECK(type);
12,503✔
281
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
12,503✔
282
  SName name = {0};
12,503✔
283
  RAW_RETURN_CHECK(tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
12,503✔
284
  cJSON* tableType = cJSON_CreateString("super");
12,503✔
285
  RAW_NULL_CHECK(tableType);
12,503✔
286
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
12,503✔
287
  cJSON* tableName = cJSON_CreateString(name.tname);
12,503✔
288
  RAW_NULL_CHECK(tableName);
12,503✔
289
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
12,503✔
290

291
  cJSON* alterType = cJSON_CreateNumber(req.alterType);
12,503✔
292
  RAW_NULL_CHECK(alterType);
12,503✔
293
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "alterType", alterType));
12,503✔
294
  switch (req.alterType) {
12,503✔
295
    case TSDB_ALTER_TABLE_ADD_TAG:
7,332✔
296
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
297
      if (taosArrayGetSize(req.pFields) != 1) {
7,332✔
298
        uError("invalid field num %" PRIzu " for alter type %d", taosArrayGetSize(req.pFields), req.alterType);
×
299
        cJSON_Delete(json);
×
300
        json = NULL;
×
301
        code = TSDB_CODE_INVALID_PARA;
×
302
        goto end;
×
303
      }
304
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
7,332✔
305
      RAW_NULL_CHECK(field);
7,332✔
306
      cJSON* colName = cJSON_CreateString(field->name);
7,332✔
307
      RAW_NULL_CHECK(colName);
7,332✔
308
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
7,332✔
309
      cJSON* colType = cJSON_CreateNumber(field->type);
7,332✔
310
      RAW_NULL_CHECK(colType);
7,332✔
311
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
7,332✔
312

313
      if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
7,332✔
314
          field->type == TSDB_DATA_TYPE_GEOMETRY) {
7,332✔
315
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
2,727✔
316
        cJSON*  cbytes = cJSON_CreateNumber(length);
2,727✔
317
        RAW_NULL_CHECK(cbytes);
2,727✔
318
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
2,727✔
319
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
4,605✔
320
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
283✔
321
        cJSON*  cbytes = cJSON_CreateNumber(length);
283✔
322
        RAW_NULL_CHECK(cbytes);
283✔
323
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
283✔
324
      } else if (IS_STR_DATA_BLOB(field->type)) {
4,322✔
325
        int32_t length = field->bytes - BLOBSTR_HEADER_SIZE;
×
326
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
327
        RAW_NULL_CHECK(cbytes);
×
328
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
329
      }
330
      break;
7,332✔
331
    }
332
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: {
×
333
      SFieldWithOptions* field = taosArrayGet(req.pFields, 0);
×
334
      RAW_NULL_CHECK(field);
×
335
      cJSON* colName = cJSON_CreateString(field->name);
×
336
      RAW_NULL_CHECK(colName);
×
337
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
338
      cJSON* colType = cJSON_CreateNumber(field->type);
×
339
      RAW_NULL_CHECK(colType);
×
340
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
×
341

342
      if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
×
343
          field->type == TSDB_DATA_TYPE_GEOMETRY) {
×
344
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
×
345
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
346
        RAW_NULL_CHECK(cbytes);
×
347
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
348
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
×
349
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
350
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
351
        RAW_NULL_CHECK(cbytes);
×
352
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
353
      } else if (IS_STR_DATA_BLOB(field->type)) {
×
354
        int32_t length = field->bytes - BLOBSTR_HEADER_SIZE;
×
355
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
356
        RAW_NULL_CHECK(cbytes);
×
357
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
358
      }
359

360
      RAW_RETURN_CHECK(setCompressOption(json, field->compress));
×
361
      break;
×
362
    }
363
    case TSDB_ALTER_TABLE_DROP_TAG:
2,161✔
364
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
365
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
2,161✔
366
      RAW_NULL_CHECK(field);
2,161✔
367
      cJSON* colName = cJSON_CreateString(field->name);
2,161✔
368
      RAW_NULL_CHECK(colName);
2,161✔
369
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
2,161✔
370
      break;
2,161✔
371
    }
372
    case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
3,010✔
373
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
374
      if (taosArrayGetSize(req.pFields) != 1) {
3,010✔
375
        uError("invalid field num %" PRIzu " for alter type %d", taosArrayGetSize(req.pFields), req.alterType);
×
376
        cJSON_Delete(json);
×
377
        json = NULL;
×
378
        code = TSDB_CODE_INVALID_PARA;
×
379
        goto end;
×
380
      }
381
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
3,010✔
382
      RAW_NULL_CHECK(field);
3,010✔
383
      cJSON* colName = cJSON_CreateString(field->name);
3,010✔
384
      RAW_NULL_CHECK(colName);
3,010✔
385
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
3,010✔
386
      cJSON* colType = cJSON_CreateNumber(field->type);
3,010✔
387
      RAW_NULL_CHECK(colType);
3,010✔
388
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
3,010✔
389
      if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
3,010✔
390
          field->type == TSDB_DATA_TYPE_GEOMETRY) {
3,010✔
391
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
2,727✔
392
        cJSON*  cbytes = cJSON_CreateNumber(length);
2,727✔
393
        RAW_NULL_CHECK(cbytes);
2,727✔
394
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
2,727✔
395
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
283✔
396
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
283✔
397
        cJSON*  cbytes = cJSON_CreateNumber(length);
283✔
398
        RAW_NULL_CHECK(cbytes);
283✔
399
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
283✔
400
      }
401
      break;
3,010✔
402
    }
403
    case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
×
404
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
405
      TAOS_FIELD* oldField = taosArrayGet(req.pFields, 0);
×
406
      RAW_NULL_CHECK(oldField);
×
407
      TAOS_FIELD* newField = taosArrayGet(req.pFields, 1);
×
408
      RAW_NULL_CHECK(newField);
×
409
      cJSON* colName = cJSON_CreateString(oldField->name);
×
410
      RAW_NULL_CHECK(colName);
×
411
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
412
      cJSON* colNewName = cJSON_CreateString(newField->name);
×
413
      RAW_NULL_CHECK(colNewName);
×
414
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colNewName", colNewName));
×
415
      break;
×
416
    }
417
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
×
418
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
×
419
      RAW_NULL_CHECK(field);
×
420
      cJSON* colName = cJSON_CreateString(field->name);
×
421
      RAW_NULL_CHECK(colName);
×
422
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
423
      RAW_RETURN_CHECK(setCompressOption(json, field->bytes));
×
424
      break;
×
425
    }
426
    default:
×
427
      break;
×
428
  }
429

430
end:
12,503✔
431
  tFreeSMAltertbReq(&req);
12,503✔
432
  *pJson = json;
12,503✔
433
  RAW_LOG_END
12,503✔
434
  return code;
12,503✔
435
}
436

437
static int32_t processCreateStb(SMqMetaRsp* metaRsp, cJSON** pJson) {
21,165✔
438
  if (metaRsp == NULL || pJson == NULL) {
21,165✔
439
    uError("invalid parameter in %s", __func__);
×
440
    return TSDB_CODE_INVALID_PARA;
×
441
  }
442
  int32_t        code = TSDB_CODE_SUCCESS;
21,165✔
443
  int32_t        lino = 0;
21,165✔
444
  SVCreateStbReq req = {0};
21,165✔
445
  SDecoder       coder = {0};
21,165✔
446

447
  RAW_LOG_START
21,165✔
448
  // decode and process req
449
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
21,165✔
450
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
21,165✔
451
  tDecoderInit(&coder, data, len);
21,165✔
452

453
  RAW_RETURN_CHECK(tDecodeSVCreateStbReq(&coder, &req));
21,165✔
454
  RAW_RETURN_CHECK(buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.pExtSchemas, req.name, req.suid,
21,165✔
455
                                        TSDB_SUPER_TABLE, &req.colCmpr, pJson));
456

457
end:
21,165✔
458
  tDecoderClear(&coder);
21,165✔
459
  RAW_LOG_END
21,165✔
460
  return code;
21,165✔
461
}
462

463
static int32_t processAlterStb(SMqMetaRsp* metaRsp, cJSON** pJson) {
12,503✔
464
  if (metaRsp == NULL || pJson == NULL) {
12,503✔
465
    uError("invalid parameter in %s", __func__);
×
466
    return TSDB_CODE_INVALID_PARA;
×
467
  }
468
  SVCreateStbReq req = {0};
12,503✔
469
  SDecoder       coder = {0};
12,503✔
470
  int32_t        code = TSDB_CODE_SUCCESS;
12,503✔
471
  int32_t        lino = 0;
12,503✔
472
  RAW_LOG_START
12,503✔
473

474
  // decode and process req
475
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
12,503✔
476
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
12,503✔
477
  tDecoderInit(&coder, data, len);
12,503✔
478

479
  RAW_RETURN_CHECK(tDecodeSVCreateStbReq(&coder, &req));
12,503✔
480
  RAW_RETURN_CHECK(buildAlterSTableJson(req.alterOriData, req.alterOriDataLen, pJson));
12,503✔
481

482
end:
12,503✔
483
  tDecoderClear(&coder);
12,503✔
484
  RAW_LOG_END
12,503✔
485
  return code;
12,503✔
486
}
487

488
static int32_t buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
40,116✔
489
  if (json == NULL || pCreateReq == NULL) {
40,116✔
490
    uError("invalid parameter in %s", __func__);
×
491
    return TSDB_CODE_INVALID_PARA;
×
492
  }
493
  STag*   pTag = (STag*)pCreateReq->ctb.pTag;
40,116✔
494
  char*   sname = pCreateReq->ctb.stbName;
40,116✔
495
  char*   name = pCreateReq->name;
40,116✔
496
  SArray* tagName = pCreateReq->ctb.tagName;
40,116✔
497
  int64_t id = pCreateReq->uid;
40,116✔
498
  uint8_t tagNum = pCreateReq->ctb.tagNum;
40,116✔
499
  int32_t code = 0;
40,116✔
500
  int32_t lino = 0;
40,116✔
501
  SArray* pTagVals = NULL;
40,116✔
502
  char*   pJson = NULL;
40,116✔
503
  RAW_LOG_START
40,116✔
504

505
  cJSON* tableName = cJSON_CreateString(name);
40,116✔
506
  RAW_NULL_CHECK(tableName);
40,116✔
507
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
40,116✔
508
  cJSON* using = cJSON_CreateString(sname);
40,116✔
509
  RAW_NULL_CHECK(using);
40,116✔
510
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "using", using));
40,116✔
511
  cJSON* tagNumJson = cJSON_CreateNumber(tagNum);
40,116✔
512
  RAW_NULL_CHECK(tagNumJson);
40,116✔
513
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tagNum", tagNumJson));
40,116✔
514

515
  cJSON* tags = cJSON_CreateArray();
40,116✔
516
  RAW_NULL_CHECK(tags);
40,116✔
517
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags));
40,116✔
518
  RAW_RETURN_CHECK(tTagToValArray(pTag, &pTagVals));
40,116✔
519
  if (tTagIsJson(pTag)) {
40,116✔
520
    STag* p = (STag*)pTag;
5,372✔
521
    if (p->nTag == 0) {
5,372✔
522
      uWarn("p->nTag == 0");
2,686✔
523
      goto end;
2,686✔
524
    }
525
    parseTagDatatoJson(pTag, &pJson, NULL);
2,686✔
526
    RAW_NULL_CHECK(pJson);
2,686✔
527
    cJSON* tag = cJSON_CreateObject();
2,686✔
528
    RAW_NULL_CHECK(tag);
2,686✔
529
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag));
2,686✔
530
    STagVal* pTagVal = taosArrayGet(pTagVals, 0);
2,686✔
531
    RAW_NULL_CHECK(pTagVal);
2,686✔
532
    char* ptname = taosArrayGet(tagName, 0);
2,686✔
533
    RAW_NULL_CHECK(ptname);
2,686✔
534
    cJSON* tname = cJSON_CreateString(ptname);
2,686✔
535
    RAW_NULL_CHECK(tname);
2,686✔
536
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname));
2,686✔
537
    cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON);
2,686✔
538
    RAW_NULL_CHECK(ttype);
2,686✔
539
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype));
2,686✔
540
    cJSON* tvalue = cJSON_CreateString(pJson);
2,686✔
541
    RAW_NULL_CHECK(tvalue);
2,686✔
542
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "value", tvalue));
2,686✔
543
    goto end;
2,686✔
544
  }
545

546
  for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
119,593✔
547
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
84,849✔
548
    RAW_NULL_CHECK(pTagVal);
84,849✔
549
    cJSON* tag = cJSON_CreateObject();
84,849✔
550
    RAW_NULL_CHECK(tag);
84,849✔
551
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag));
84,849✔
552
    char* ptname = taosArrayGet(tagName, i);
84,849✔
553
    RAW_NULL_CHECK(ptname);
84,849✔
554
    cJSON* tname = cJSON_CreateString(ptname);
84,849✔
555
    RAW_NULL_CHECK(tname);
84,849✔
556
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname));
84,849✔
557
    cJSON* ttype = cJSON_CreateNumber(pTagVal->type);
84,849✔
558
    RAW_NULL_CHECK(ttype);
84,849✔
559
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype));
84,849✔
560

561
    cJSON* tvalue = NULL;
84,849✔
562
    if (IS_VAR_DATA_TYPE(pTagVal->type)) {
113,108✔
563
      if (IS_STR_DATA_BLOB(pTagVal->type)) {
28,259✔
564
        goto end;
×
565
      }
566
      int64_t bufSize = 0;
28,259✔
567
      if (pTagVal->type == TSDB_DATA_TYPE_VARBINARY) {
28,259✔
568
        bufSize = pTagVal->nData * 2 + 2 + 3;
×
569
      } else {
570
        bufSize = pTagVal->nData + 3;
28,259✔
571
      }
572
      char* buf = taosMemoryCalloc(bufSize, 1);
28,259✔
573
      RAW_NULL_CHECK(buf);
28,259✔
574
      code = dataConverToStr(buf, bufSize, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL);
28,259✔
575
      if (code != TSDB_CODE_SUCCESS) {
28,259✔
576
        uError("convert tag value to string failed");
×
577
        taosMemoryFree(buf);
×
578
        goto end;
×
579
      }
580

581
      tvalue = cJSON_CreateString(buf);
28,259✔
582
      taosMemoryFree(buf);
28,259✔
583
      RAW_NULL_CHECK(tvalue);
28,259✔
584
    } else {
585
      double val = 0;
56,590✔
586
      GET_TYPED_DATA(val, double, pTagVal->type, &pTagVal->i64,
56,590✔
587
                     0);  // currently tag type can't be decimal, so pass 0 as typeMod
588
      tvalue = cJSON_CreateNumber(val);
56,590✔
589
      RAW_NULL_CHECK(tvalue);
56,590✔
590
    }
591

592
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "value", tvalue));
84,849✔
593
  }
594

595
end:
40,116✔
596
  taosMemoryFree(pJson);
40,116✔
597
  taosArrayDestroy(pTagVals);
40,116✔
598
  RAW_LOG_END
40,116✔
599
  return code;
40,116✔
600
}
601

602
static int32_t buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs, cJSON** pJson) {
32,458✔
603
  if (pJson == NULL || pCreateReq == NULL) {
32,458✔
604
    uError("invalid parameter in %s", __func__);
×
605
    return TSDB_CODE_INVALID_PARA;
×
606
  }
607
  int32_t code = 0;
32,458✔
608
  int32_t lino = 0;
32,458✔
609
  RAW_LOG_START
32,458✔
610
  char*  string = NULL;
32,458✔
611
  cJSON* json = cJSON_CreateObject();
32,458✔
612
  RAW_NULL_CHECK(json);
32,458✔
613
  cJSON* type = cJSON_CreateString("create");
32,458✔
614
  RAW_NULL_CHECK(type);
32,458✔
615
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
32,458✔
616

617
  cJSON* tableType = cJSON_CreateString("child");
32,458✔
618
  RAW_NULL_CHECK(tableType);
32,458✔
619
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
32,458✔
620

621
  RAW_RETURN_CHECK(buildChildElement(json, pCreateReq));
32,458✔
622
  cJSON* createList = cJSON_CreateArray();
32,458✔
623
  RAW_NULL_CHECK(createList);
32,458✔
624
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "createList", createList));
32,458✔
625

626
  for (int i = 0; nReqs > 1 && i < nReqs; i++) {
40,116✔
627
    cJSON* create = cJSON_CreateObject();
7,658✔
628
    RAW_NULL_CHECK(create);
7,658✔
629
    RAW_RETURN_CHECK(buildChildElement(create, pCreateReq + i));
7,658✔
630
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(createList, create));
7,658✔
631
  }
632

633
end:
32,458✔
634
  *pJson = json;
32,458✔
635
  RAW_LOG_END
32,458✔
636
  return code;
32,458✔
637
}
638

639
static int32_t processCreateTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
35,650✔
640
  if (pJson == NULL || metaRsp == NULL) {
35,650✔
641
    uError("invalid parameter in %s", __func__);
×
642
    return TSDB_CODE_INVALID_PARA;
×
643
  }
644
  int32_t            code = TSDB_CODE_SUCCESS;
35,650✔
645
  int32_t            lino = 0;
35,650✔
646
  SDecoder           decoder = {0};
35,650✔
647
  SVCreateTbBatchReq req = {0};
35,650✔
648
  SVCreateTbReq*     pCreateReq;
649
  RAW_LOG_START
35,650✔
650
  // decode
651
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
35,650✔
652
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
35,650✔
653
  tDecoderInit(&decoder, data, len);
35,650✔
654
  RAW_RETURN_CHECK(tDecodeSVCreateTbBatchReq(&decoder, &req));
35,650✔
655
  // loop to create table
656
  if (req.nReqs > 0) {
35,650✔
657
    pCreateReq = req.pReqs;
35,650✔
658
    if (pCreateReq->type == TSDB_CHILD_TABLE) {
35,650✔
659
      RAW_RETURN_CHECK(buildCreateCTableJson(req.pReqs, req.nReqs, pJson));
30,481✔
660
    } else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
5,169✔
661
      RAW_RETURN_CHECK(buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->pExtSchemas, pCreateReq->name,
5,169✔
662
                                            pCreateReq->uid, TSDB_NORMAL_TABLE, &pCreateReq->colCmpr, pJson));
663
    }
664
  }
665

666
end:
35,650✔
667
  tDeleteSVCreateTbBatchReq(&req);
35,650✔
668
  tDecoderClear(&decoder);
35,650✔
669
  RAW_LOG_END
35,650✔
670
  return code;
35,650✔
671
}
672

673
static int32_t processAutoCreateTable(SMqDataRsp* rsp, char** string) {
1,977✔
674
  int32_t lino = 0;
1,977✔
675
  int32_t code = TSDB_CODE_SUCCESS;
1,977✔
676
  RAW_LOG_START
1,977✔
677
  RAW_FALSE_CHECK(rsp != NULL && string != NULL);
1,977✔
678
  SDecoder*      decoder = NULL;
1,977✔
679
  SVCreateTbReq* pCreateReq = NULL;
1,977✔
680
  RAW_FALSE_CHECK(rsp->createTableNum > 0);
1,977✔
681

682
  decoder = taosMemoryCalloc(rsp->createTableNum, sizeof(SDecoder));
1,977✔
683
  RAW_NULL_CHECK(decoder);
1,977✔
684
  pCreateReq = taosMemoryCalloc(rsp->createTableNum, sizeof(SVCreateTbReq));
1,977✔
685
  RAW_NULL_CHECK(pCreateReq);
1,977✔
686

687
  // loop to create table
688
  for (int32_t iReq = 0; iReq < rsp->createTableNum; iReq++) {
5,855✔
689
    // decode
690
    void** data = taosArrayGet(rsp->createTableReq, iReq);
3,878✔
691
    RAW_NULL_CHECK(data);
3,878✔
692
    int32_t* len = taosArrayGet(rsp->createTableLen, iReq);
3,878✔
693
    RAW_NULL_CHECK(len);
3,878✔
694
    tDecoderInit(&decoder[iReq], *data, *len);
3,878✔
695
    RAW_RETURN_CHECK(tDecodeSVCreateTbReq(&decoder[iReq], pCreateReq + iReq));
3,878✔
696

697
    if (pCreateReq[iReq].type != TSDB_CHILD_TABLE && pCreateReq[iReq].type != TSDB_NORMAL_TABLE) {
3,878✔
698
      uError("%s failed. pCreateReq[iReq].type:%d invalid", __func__, pCreateReq[iReq].type);
×
699
      code = TSDB_CODE_INVALID_PARA;
×
700
      goto end;
×
701
    }
702
  }
703
  cJSON* pJson = NULL;
1,977✔
704
  if (pCreateReq->type == TSDB_NORMAL_TABLE) {
1,977✔
UNCOV
705
    RAW_RETURN_CHECK(buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->pExtSchemas, pCreateReq->name,
×
706
                                          pCreateReq->uid, TSDB_NORMAL_TABLE, &pCreateReq->colCmpr, &pJson));
707
  } else if (pCreateReq->type == TSDB_CHILD_TABLE) {
1,977✔
708
    RAW_RETURN_CHECK(buildCreateCTableJson(pCreateReq, rsp->createTableNum, &pJson));
1,977✔
709
  }
710

711
  *string = cJSON_PrintUnformatted(pJson);
1,977✔
712
  cJSON_Delete(pJson);
1,977✔
713

714
  uDebug("auto created table return, sql json:%s", *string);
1,977✔
715

716
end:
1,977✔
717
  RAW_LOG_END
1,977✔
718
  for (int i = 0; decoder && pCreateReq && i < rsp->createTableNum; i++) {
5,855✔
719
    tDecoderClear(&decoder[i]);
3,878✔
720
    taosMemoryFreeClear(pCreateReq[i].comment);
3,878✔
721
    if (pCreateReq[i].type == TSDB_CHILD_TABLE) {
3,878✔
722
      taosArrayDestroy(pCreateReq[i].ctb.tagName);
3,878✔
723
    }
724
  }
725
  taosMemoryFree(decoder);
1,977✔
726
  taosMemoryFree(pCreateReq);
1,977✔
727
  return code;
1,977✔
728
}
729

730
static int32_t processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
6,853✔
731
  if (pJson == NULL || metaRsp == NULL) {
6,853✔
732
    uError("invalid parameter in %s", __func__);
×
733
    return TSDB_CODE_INVALID_PARA;
×
734
  }
735
  SDecoder     decoder = {0};
6,853✔
736
  SVAlterTbReq vAlterTbReq = {0};
6,853✔
737
  char*        string = NULL;
6,853✔
738
  cJSON*       json = NULL;
6,853✔
739
  int32_t      code = 0;
6,853✔
740
  int32_t      lino = 0;
6,853✔
741
  RAW_LOG_START
6,853✔
742

743
  // decode
744
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
6,853✔
745
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
6,853✔
746
  tDecoderInit(&decoder, data, len);
6,853✔
747
  RAW_RETURN_CHECK(tDecodeSVAlterTbReq(&decoder, &vAlterTbReq));
6,853✔
748

749
  json = cJSON_CreateObject();
6,853✔
750
  RAW_NULL_CHECK(json);
6,853✔
751
  cJSON* type = cJSON_CreateString("alter");
6,853✔
752
  RAW_NULL_CHECK(type);
6,853✔
753
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
6,853✔
754
  cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ||
12,308✔
755
                                                vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL
5,455✔
756
                                            ? "child"
757
                                            : "normal");
758
  RAW_NULL_CHECK(tableType);
6,853✔
759
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
6,853✔
760
  cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
6,853✔
761
  RAW_NULL_CHECK(tableName);
6,853✔
762
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
6,853✔
763
  cJSON* alterType = cJSON_CreateNumber(vAlterTbReq.action);
6,853✔
764
  RAW_NULL_CHECK(alterType);
6,853✔
765
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "alterType", alterType));
6,853✔
766

767
  uDebug("alter table action:%d", vAlterTbReq.action);
6,853✔
768
  switch (vAlterTbReq.action) {
6,853✔
769
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
1,091✔
770
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
1,091✔
771
      RAW_NULL_CHECK(colName);
1,091✔
772
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
1,091✔
773
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
1,091✔
774
      RAW_NULL_CHECK(colType);
1,091✔
775
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
1,091✔
776

777
      if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY ||
1,091✔
778
          vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) {
1,091✔
779
        int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
×
780
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
781
        RAW_NULL_CHECK(cbytes);
×
782
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
783
      } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
1,091✔
784
        int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
785
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
786
        RAW_NULL_CHECK(cbytes);
×
787
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
788
      }
789
      break;
1,091✔
790
    }
791
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: {
×
792
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
×
793
      RAW_NULL_CHECK(colName);
×
794
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
795
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
×
796
      RAW_NULL_CHECK(colType);
×
797
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
×
798

799
      if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY ||
×
800
          vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) {
×
801
        int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
×
802
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
803
        RAW_NULL_CHECK(cbytes);
×
804
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
805
      } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
×
806
        int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
807
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
808
        RAW_NULL_CHECK(cbytes);
×
809
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
810
      }
811
      RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
×
812
      break;
×
813
    }
814
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
1,091✔
815
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
1,091✔
816
      RAW_NULL_CHECK(colName);
1,091✔
817
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
1,091✔
818
      break;
1,091✔
819
    }
820
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
1,091✔
821
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
1,091✔
822
      RAW_NULL_CHECK(colName);
1,091✔
823
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
1,091✔
824
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.colModType);
1,091✔
825
      RAW_NULL_CHECK(colType);
1,091✔
826
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
1,091✔
827
      if (vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY || vAlterTbReq.colModType == TSDB_DATA_TYPE_VARBINARY ||
1,091✔
828
          vAlterTbReq.colModType == TSDB_DATA_TYPE_GEOMETRY) {
1,091✔
829
        int32_t length = vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE;
×
830
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
831
        RAW_NULL_CHECK(cbytes);
×
832
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
833
      } else if (vAlterTbReq.colModType == TSDB_DATA_TYPE_NCHAR) {
1,091✔
834
        int32_t length = (vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
1,091✔
835
        cJSON*  cbytes = cJSON_CreateNumber(length);
1,091✔
836
        RAW_NULL_CHECK(cbytes);
1,091✔
837
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
1,091✔
838
      }
839
      break;
1,091✔
840
    }
841
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
1,091✔
842
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
1,091✔
843
      RAW_NULL_CHECK(colName);
1,091✔
844
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
1,091✔
845
      cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName);
1,091✔
846
      RAW_NULL_CHECK(colNewName);
1,091✔
847
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colNewName", colNewName));
1,091✔
848
      break;
1,091✔
849
    }
850
    case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: {
1,398✔
851
      cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName);
1,398✔
852
      RAW_NULL_CHECK(tagName);
1,398✔
853
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", tagName));
1,398✔
854

855
      bool isNull = vAlterTbReq.isNull;
1,398✔
856
      if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
1,398✔
857
        STag* jsonTag = (STag*)vAlterTbReq.pTagVal;
×
858
        if (jsonTag->nTag == 0) isNull = true;
×
859
      }
860
      if (!isNull) {
1,398✔
861
        char* buf = NULL;
1,398✔
862

863
        if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
1,398✔
864
          if (!tTagIsJson(vAlterTbReq.pTagVal)) {
×
865
            code = TSDB_CODE_INVALID_PARA;
×
866
            uError("processAlterTable isJson false");
×
867
            goto end;
×
868
          }
869
          parseTagDatatoJson(vAlterTbReq.pTagVal, &buf, NULL);
×
870
          if (buf == NULL) {
×
871
            code = TSDB_CODE_INVALID_PARA;
×
872
            uError("parseTagDatatoJson failed, buf == NULL");
×
873
            goto end;
×
874
          }
875
        } else {
876
          int64_t bufSize = 0;
1,398✔
877
          if (vAlterTbReq.tagType == TSDB_DATA_TYPE_VARBINARY) {
1,398✔
878
            bufSize = vAlterTbReq.nTagVal * 2 + 2 + 3;
×
879
          } else {
880
            bufSize = vAlterTbReq.nTagVal + 32;
1,398✔
881
          }
882
          buf = taosMemoryCalloc(bufSize, 1);
1,398✔
883
          RAW_NULL_CHECK(buf);
1,398✔
884
          code = dataConverToStr(buf, bufSize, vAlterTbReq.tagType, vAlterTbReq.pTagVal, vAlterTbReq.nTagVal, NULL);
1,398✔
885
          if (code != TSDB_CODE_SUCCESS) {
1,398✔
886
            uError("convert tag value to string failed");
×
887
            taosMemoryFree(buf);
×
888
            goto end;
×
889
          }
890
        }
891

892
        cJSON* colValue = cJSON_CreateString(buf);
1,398✔
893
        taosMemoryFree(buf);
1,398✔
894
        RAW_NULL_CHECK(colValue);
1,398✔
895
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colValue", colValue));
1,398✔
896
      }
897

898
      cJSON* isNullCJson = cJSON_CreateBool(isNull);
1,398✔
899
      RAW_NULL_CHECK(isNullCJson);
1,398✔
900
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colValueNull", isNullCJson));
1,398✔
901
      break;
1,398✔
902
    }
903
    case TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL: {
×
904
      int32_t nTags = taosArrayGetSize(vAlterTbReq.pMultiTag);
×
905
      if (nTags <= 0) {
×
906
        code = TSDB_CODE_INVALID_PARA;
×
907
        uError("processAlterTable parse multi tags error");
×
908
        goto end;
×
909
      }
910

911
      cJSON* tags = cJSON_CreateArray();
×
912
      RAW_NULL_CHECK(tags);
×
913
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags));
×
914

915
      for (int32_t i = 0; i < nTags; i++) {
×
916
        cJSON* member = cJSON_CreateObject();
×
917
        RAW_NULL_CHECK(member);
×
918
        RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, member));
×
919

920
        SMultiTagUpateVal* pTagVal = taosArrayGet(vAlterTbReq.pMultiTag, i);
×
921
        cJSON*             tagName = cJSON_CreateString(pTagVal->tagName);
×
922
        RAW_NULL_CHECK(tagName);
×
923
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colName", tagName));
×
924

925
        if (pTagVal->tagType == TSDB_DATA_TYPE_JSON) {
×
926
          code = TSDB_CODE_INVALID_PARA;
×
927
          uError("processAlterTable isJson false");
×
928
          goto end;
×
929
        }
930
        bool isNull = pTagVal->isNull;
×
931
        if (!isNull) {
×
932
          int64_t bufSize = 0;
×
933
          if (pTagVal->tagType == TSDB_DATA_TYPE_VARBINARY) {
×
934
            bufSize = pTagVal->nTagVal * 2 + 2 + 3;
×
935
          } else {
936
            bufSize = pTagVal->nTagVal + 3;
×
937
          }
938
          char* buf = taosMemoryCalloc(bufSize, 1);
×
939
          RAW_NULL_CHECK(buf);
×
940
          code = dataConverToStr(buf, bufSize, pTagVal->tagType, pTagVal->pTagVal, pTagVal->nTagVal, NULL);
×
941
          if (code != TSDB_CODE_SUCCESS) {
×
942
            uError("convert tag value to string failed");
×
943
            taosMemoryFree(buf);
×
944
            goto end;
×
945
          }
946
          cJSON* colValue = cJSON_CreateString(buf);
×
947
          taosMemoryFree(buf);
×
948
          RAW_NULL_CHECK(colValue);
×
949
          RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colValue", colValue));
×
950
        }
951
        cJSON* isNullCJson = cJSON_CreateBool(isNull);
×
952
        RAW_NULL_CHECK(isNullCJson);
×
953
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colValueNull", isNullCJson));
×
954
      }
955
      break;
×
956
    }
957

958
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
×
959
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
×
960
      RAW_NULL_CHECK(colName);
×
961
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
962
      RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
×
963
      break;
×
964
    }
965
    default:
1,091✔
966
      break;
1,091✔
967
  }
968

969
end:
6,853✔
970
  if (vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL) {
6,853✔
971
    taosArrayDestroy(vAlterTbReq.pMultiTag);
×
972
  }
973
  tDecoderClear(&decoder);
6,853✔
974
  *pJson = json;
6,853✔
975
  RAW_LOG_END
6,853✔
976
  return code;
6,853✔
977
}
978

979
static int32_t processDropSTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
2,160✔
980
  if (pJson == NULL || metaRsp == NULL) {
2,160✔
981
    uError("invalid parameter in %s", __func__);
×
982
    return TSDB_CODE_INVALID_PARA;
×
983
  }
984
  SDecoder     decoder = {0};
2,160✔
985
  SVDropStbReq req = {0};
2,160✔
986
  cJSON*       json = NULL;
2,160✔
987
  int32_t      code = 0;
2,160✔
988
  int32_t      lino = 0;
2,160✔
989
  RAW_LOG_START
2,160✔
990

991
  // decode
992
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
2,160✔
993
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
2,160✔
994
  tDecoderInit(&decoder, data, len);
2,160✔
995
  RAW_RETURN_CHECK(tDecodeSVDropStbReq(&decoder, &req));
2,160✔
996

997
  json = cJSON_CreateObject();
2,160✔
998
  RAW_NULL_CHECK(json);
2,160✔
999
  cJSON* type = cJSON_CreateString("drop");
2,160✔
1000
  RAW_NULL_CHECK(type);
2,160✔
1001
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
2,160✔
1002
  cJSON* tableType = cJSON_CreateString("super");
2,160✔
1003
  RAW_NULL_CHECK(tableType);
2,160✔
1004
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
2,160✔
1005
  cJSON* tableName = cJSON_CreateString(req.name);
2,160✔
1006
  RAW_NULL_CHECK(tableName);
2,160✔
1007
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
2,160✔
1008

1009
end:
2,160✔
1010
  tDecoderClear(&decoder);
2,160✔
1011
  *pJson = json;
2,160✔
1012
  RAW_LOG_END
2,160✔
1013
  return code;
2,160✔
1014
}
1015
static int32_t processDeleteTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
812✔
1016
  if (pJson == NULL || metaRsp == NULL) {
812✔
1017
    uError("invalid parameter in %s", __func__);
×
1018
    return TSDB_CODE_INVALID_PARA;
×
1019
  }
1020
  SDeleteRes req = {0};
812✔
1021
  SDecoder   coder = {0};
812✔
1022
  cJSON*     json = NULL;
812✔
1023
  int32_t    code = 0;
812✔
1024
  int32_t    lino = 0;
812✔
1025
  RAW_LOG_START
812✔
1026

1027
  // decode and process req
1028
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
812✔
1029
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
812✔
1030

1031
  tDecoderInit(&coder, data, len);
812✔
1032
  RAW_RETURN_CHECK(tDecodeDeleteRes(&coder, &req));
812✔
1033
  //  getTbName(req.tableFName);
1034
  char sql[256] = {0};
812✔
1035
  (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
812✔
1036
                 req.tsColName, req.skey, req.tsColName, req.ekey);
1037

1038
  json = cJSON_CreateObject();
812✔
1039
  RAW_NULL_CHECK(json);
812✔
1040
  cJSON* type = cJSON_CreateString("delete");
812✔
1041
  RAW_NULL_CHECK(type);
812✔
1042
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
812✔
1043
  cJSON* sqlJson = cJSON_CreateString(sql);
812✔
1044
  RAW_NULL_CHECK(sqlJson);
812✔
1045
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "sql", sqlJson));
812✔
1046

1047
end:
812✔
1048
  tDecoderClear(&coder);
812✔
1049
  *pJson = json;
812✔
1050
  RAW_LOG_END
812✔
1051
  return code;
812✔
1052
}
1053

1054
static int32_t processDropTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
1,359✔
1055
  if (pJson == NULL || metaRsp == NULL) {
1,359✔
1056
    uError("invalid parameter in %s", __func__);
×
1057
    return TSDB_CODE_INVALID_PARA;
×
1058
  }
1059
  SDecoder         decoder = {0};
1,359✔
1060
  SVDropTbBatchReq req = {0};
1,359✔
1061
  cJSON*           json = NULL;
1,359✔
1062
  int32_t          code = 0;
1,359✔
1063
  int32_t          lino = 0;
1,359✔
1064
  RAW_LOG_START
1,359✔
1065
  // decode
1066
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
1,359✔
1067
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
1,359✔
1068
  tDecoderInit(&decoder, data, len);
1,359✔
1069
  RAW_RETURN_CHECK(tDecodeSVDropTbBatchReq(&decoder, &req));
1,359✔
1070

1071
  json = cJSON_CreateObject();
1,359✔
1072
  RAW_NULL_CHECK(json);
1,359✔
1073
  cJSON* type = cJSON_CreateString("drop");
1,359✔
1074
  RAW_NULL_CHECK(type);
1,359✔
1075
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
1,359✔
1076
  cJSON* tableNameList = cJSON_CreateArray();
1,359✔
1077
  RAW_NULL_CHECK(tableNameList);
1,359✔
1078
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableNameList", tableNameList));
1,359✔
1079

1080
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
2,997✔
1081
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
1,638✔
1082
    cJSON*       tableName = cJSON_CreateString(pDropTbReq->name);
1,638✔
1083
    RAW_NULL_CHECK(tableName);
1,638✔
1084
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tableNameList, tableName));
1,638✔
1085
  }
1086

1087
end:
1,359✔
1088
  tDecoderClear(&decoder);
1,359✔
1089
  *pJson = json;
1,359✔
1090
  RAW_LOG_END
1,359✔
1091
  return code;
1,359✔
1092
}
1093

1094
static int32_t taosCreateStb(TAOS* taos, void* meta, uint32_t metaLen) {
30,727✔
1095
  if (taos == NULL || meta == NULL) {
30,727✔
1096
    uError("invalid parameter in %s", __func__);
×
1097
    return TSDB_CODE_INVALID_PARA;
×
1098
  }
1099
  SVCreateStbReq req = {0};
30,727✔
1100
  SDecoder       coder = {0};
30,727✔
1101
  SMCreateStbReq pReq = {0};
30,727✔
1102
  int32_t        code = TSDB_CODE_SUCCESS;
30,727✔
1103
  int32_t        lino = 0;
30,727✔
1104
  SRequestObj*   pRequest = NULL;
30,727✔
1105
  SCmdMsgInfo    pCmdMsg = {0};
30,727✔
1106
  RAW_LOG_START
30,727✔
1107

1108
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
30,727✔
1109
  uDebug(LOG_ID_TAG " create stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
30,727✔
1110
  pRequest->syncQuery = true;
30,727✔
1111
  if (!pRequest->pDb) {
30,727✔
1112
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1113
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
1114
    goto end;
×
1115
  }
1116
  // decode and process req
1117
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
30,727✔
1118
  uint32_t len = metaLen - sizeof(SMsgHead);
30,727✔
1119
  tDecoderInit(&coder, data, len);
30,727✔
1120
  RAW_RETURN_CHECK(tDecodeSVCreateStbReq(&coder, &req));
30,727✔
1121

1122
  int8_t           createDefaultCompress = 0;
30,727✔
1123
  SColCmprWrapper* p = &req.colCmpr;
30,727✔
1124
  if (p->nCols == 0) {
30,727✔
1125
    createDefaultCompress = 1;
×
1126
  }
1127
  // build create stable
1128
  pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SFieldWithOptions));
30,727✔
1129
  RAW_NULL_CHECK(pReq.pColumns);
30,727✔
1130
  for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
203,195✔
1131
    SSchema*          pSchema = req.schemaRow.pSchema + i;
172,468✔
1132
    SFieldWithOptions field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
172,468✔
1133
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
172,468✔
1134

1135
    if (createDefaultCompress) {
172,468✔
1136
      field.compress = createDefaultColCmprByType(pSchema->type);
×
1137
    } else {
1138
      SColCmpr* pCmp = &req.colCmpr.pColCmpr[i];
172,468✔
1139
      field.compress = pCmp->alg;
172,468✔
1140
    }
1141
    if (req.pExtSchemas) field.typeMod = req.pExtSchemas[i].typeMod;
172,468✔
1142
    RAW_NULL_CHECK(taosArrayPush(pReq.pColumns, &field));
344,936✔
1143
  }
1144
  pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
30,727✔
1145
  RAW_NULL_CHECK(pReq.pTags);
30,727✔
1146
  for (int32_t i = 0; i < req.schemaTag.nCols; i++) {
115,753✔
1147
    SSchema* pSchema = req.schemaTag.pSchema + i;
85,026✔
1148
    SField   field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
85,026✔
1149
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
85,026✔
1150
    RAW_NULL_CHECK(taosArrayPush(pReq.pTags, &field));
170,052✔
1151
  }
1152

1153
  pReq.colVer = req.schemaRow.version;
30,727✔
1154
  pReq.tagVer = req.schemaTag.version;
30,727✔
1155
  pReq.numOfColumns = req.schemaRow.nCols;
30,727✔
1156
  pReq.numOfTags = req.schemaTag.nCols;
30,727✔
1157
  pReq.commentLen = -1;
30,727✔
1158
  pReq.suid = processSuid(req.suid, pRequest->pDb);
30,727✔
1159
  pReq.source = TD_REQ_FROM_TAOX;
30,727✔
1160
  pReq.igExists = true;
30,727✔
1161

1162
  uDebug(LOG_ID_TAG " create stable name:%s suid:%" PRId64 " processSuid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
30,727✔
1163
         pReq.suid);
1164
  STscObj* pTscObj = pRequest->pTscObj;
30,727✔
1165
  SName    tableName = {0};
30,727✔
1166
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
30,727✔
1167
  RAW_RETURN_CHECK(tNameExtractFullName(&tableName, pReq.name));
30,727✔
1168
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
30,727✔
1169
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
30,727✔
1170
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
30,727✔
1171
  RAW_FALSE_CHECK(pCmdMsg.msgLen > 0);
30,727✔
1172
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
30,727✔
1173
  RAW_NULL_CHECK(pCmdMsg.pMsg);
30,727✔
1174
  RAW_FALSE_CHECK(tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) > 0);
30,727✔
1175

1176
  SQuery pQuery = {0};
30,727✔
1177
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
30,727✔
1178
  pQuery.pCmdMsg = &pCmdMsg;
30,727✔
1179
  pQuery.msgType = pQuery.pCmdMsg->msgType;
30,727✔
1180
  pQuery.stableQuery = true;
30,727✔
1181

1182
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
30,727✔
1183

1184
  if (pRequest->code == TSDB_CODE_SUCCESS) {
30,727✔
1185
    SCatalog* pCatalog = NULL;
30,727✔
1186
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
30,727✔
1187
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
30,727✔
1188
  }
1189

1190
  code = pRequest->code;
30,727✔
1191
  uDebug(LOG_ID_TAG " create stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
30,727✔
1192

1193
end:
30,727✔
1194
  destroyRequest(pRequest);
30,727✔
1195
  tFreeSMCreateStbReq(&pReq);
30,727✔
1196
  tDecoderClear(&coder);
30,727✔
1197
  taosMemoryFree(pCmdMsg.pMsg);
30,727✔
1198
  RAW_LOG_END
30,727✔
1199
  return code;
30,727✔
1200
}
1201

1202
static int32_t taosDropStb(TAOS* taos, void* meta, uint32_t metaLen) {
2,160✔
1203
  if (taos == NULL || meta == NULL) {
2,160✔
1204
    uError("invalid parameter in %s", __func__);
×
1205
    return TSDB_CODE_INVALID_PARA;
×
1206
  }
1207
  SVDropStbReq req = {0};
2,160✔
1208
  SDecoder     coder = {0};
2,160✔
1209
  SMDropStbReq pReq = {0};
2,160✔
1210
  int32_t      code = TSDB_CODE_SUCCESS;
2,160✔
1211
  int32_t      lino = 0;
2,160✔
1212
  SRequestObj* pRequest = NULL;
2,160✔
1213
  SCmdMsgInfo  pCmdMsg = {0};
2,160✔
1214

1215
  RAW_LOG_START
2,160✔
1216
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
2,160✔
1217
  uDebug(LOG_ID_TAG " drop stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
2,160✔
1218
  pRequest->syncQuery = true;
2,160✔
1219
  if (!pRequest->pDb) {
2,160✔
1220
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
1221
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1222
    goto end;
×
1223
  }
1224
  // decode and process req
1225
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
2,160✔
1226
  uint32_t len = metaLen - sizeof(SMsgHead);
2,160✔
1227
  tDecoderInit(&coder, data, len);
2,160✔
1228
  RAW_RETURN_CHECK(tDecodeSVDropStbReq(&coder, &req));
2,160✔
1229
  SCatalog* pCatalog = NULL;
2,160✔
1230
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
2,160✔
1231
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
2,160✔
1232
                           .requestId = pRequest->requestId,
2,160✔
1233
                           .requestObjRefId = pRequest->self,
2,160✔
1234
                           .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
2,160✔
1235
  SName            pName = {0};
2,160✔
1236
  toName(pRequest->pTscObj->acctId, pRequest->pDb, req.name, &pName);
2,160✔
1237
  STableMeta* pTableMeta = NULL;
2,160✔
1238
  code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
2,160✔
1239
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
2,160✔
1240
    uInfo(LOG_ID_TAG " stable %s not exist, ignore drop", LOG_ID_VALUE, req.name);
534✔
1241
    code = TSDB_CODE_SUCCESS;
534✔
1242
    taosMemoryFreeClear(pTableMeta);
534✔
1243
    goto end;
534✔
1244
  }
1245
  RAW_RETURN_CHECK(code);
1,626✔
1246
  pReq.suid = pTableMeta->uid;
1,626✔
1247
  taosMemoryFreeClear(pTableMeta);
1,626✔
1248

1249
  // build drop stable
1250
  pReq.igNotExists = true;
1,626✔
1251
  pReq.source = TD_REQ_FROM_TAOX;
1,626✔
1252
  //  pReq.suid = processSuid(req.suid, pRequest->pDb);
1253

1254
  uDebug(LOG_ID_TAG " drop stable name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
1,626✔
1255
         pReq.suid);
1256
  STscObj* pTscObj = pRequest->pTscObj;
1,626✔
1257
  SName    tableName = {0};
1,626✔
1258
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
1,626✔
1259
  RAW_RETURN_CHECK(tNameExtractFullName(&tableName, pReq.name));
1,626✔
1260

1261
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
1,626✔
1262
  pCmdMsg.msgType = TDMT_MND_DROP_STB;
1,626✔
1263
  pCmdMsg.msgLen = tSerializeSMDropStbReq(NULL, 0, &pReq);
1,626✔
1264
  RAW_FALSE_CHECK(pCmdMsg.msgLen > 0);
1,626✔
1265
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
1,626✔
1266
  RAW_NULL_CHECK(pCmdMsg.pMsg);
1,626✔
1267
  RAW_FALSE_CHECK(tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) > 0);
1,626✔
1268

1269
  SQuery pQuery = {0};
1,626✔
1270
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
1,626✔
1271
  pQuery.pCmdMsg = &pCmdMsg;
1,626✔
1272
  pQuery.msgType = pQuery.pCmdMsg->msgType;
1,626✔
1273
  pQuery.stableQuery = true;
1,626✔
1274

1275
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
1,626✔
1276
  if (pRequest->code == TSDB_CODE_SUCCESS) {
1,626✔
1277
    // ignore the error code
1278
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
1,626✔
1279
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
1,626✔
1280
  }
1281

1282
  code = pRequest->code;
1,626✔
1283
  uDebug(LOG_ID_TAG " drop stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
1,626✔
1284

1285
end:
2,160✔
1286
  RAW_LOG_END
2,160✔
1287
  destroyRequest(pRequest);
2,160✔
1288
  tDecoderClear(&coder);
2,160✔
1289
  return code;
2,160✔
1290
}
1291

1292
typedef struct SVgroupCreateTableBatch {
1293
  SVCreateTbBatchReq req;
1294
  SVgroupInfo        info;
1295
  char               dbName[TSDB_DB_NAME_LEN];
1296
} SVgroupCreateTableBatch;
1297

1298
static void destroyCreateTbReqBatch(void* data) {
34,375✔
1299
  if (data == NULL) {
34,375✔
1300
    uError("invalid parameter in %s", __func__);
×
1301
    return;
×
1302
  }
1303
  SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data;
34,375✔
1304
  taosArrayDestroy(pTbBatch->req.pArray);
34,375✔
1305
}
1306

1307
static int32_t taosCreateTable(TAOS* taos, void* meta, uint32_t metaLen) {
33,840✔
1308
  if (taos == NULL || meta == NULL) {
33,840✔
1309
    uError("invalid parameter in %s", __func__);
×
1310
    return TSDB_CODE_INVALID_PARA;
×
1311
  }
1312
  SVCreateTbBatchReq req = {0};
33,840✔
1313
  SDecoder           coder = {0};
33,840✔
1314
  int32_t            code = TSDB_CODE_SUCCESS;
33,840✔
1315
  int32_t            lino = 0;
33,840✔
1316
  SRequestObj*       pRequest = NULL;
33,840✔
1317
  SQuery*            pQuery = NULL;
33,840✔
1318
  SHashObj*          pVgroupHashmap = NULL;
33,840✔
1319

1320
  RAW_LOG_START
33,840✔
1321
  SArray* pTagList = taosArrayInit(0, POINTER_BYTES);
33,840✔
1322
  RAW_NULL_CHECK(pTagList);
33,840✔
1323
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
33,840✔
1324
  uDebug(LOG_ID_TAG " create table, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
33,840✔
1325

1326
  pRequest->syncQuery = true;
33,840✔
1327
  if (!pRequest->pDb) {
33,840✔
1328
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
1329
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1330
    goto end;
×
1331
  }
1332
  // decode and process req
1333
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
33,840✔
1334
  uint32_t len = metaLen - sizeof(SMsgHead);
33,840✔
1335
  tDecoderInit(&coder, data, len);
33,840✔
1336
  RAW_RETURN_CHECK(tDecodeSVCreateTbBatchReq(&coder, &req));
33,840✔
1337
  STscObj* pTscObj = pRequest->pTscObj;
33,840✔
1338

1339
  SVCreateTbReq* pCreateReq = NULL;
33,840✔
1340
  SCatalog*      pCatalog = NULL;
33,840✔
1341
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
33,840✔
1342
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
33,840✔
1343
  RAW_NULL_CHECK(pVgroupHashmap);
33,840✔
1344
  taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
33,840✔
1345

1346
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
33,840✔
1347
                           .requestId = pRequest->requestId,
33,840✔
1348
                           .requestObjRefId = pRequest->self,
33,840✔
1349
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
33,840✔
1350

1351
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
33,840✔
1352
  RAW_NULL_CHECK(pRequest->tableList);
33,840✔
1353
  // loop to create table
1354
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
71,255✔
1355
    pCreateReq = req.pReqs + iReq;
37,415✔
1356

1357
    SVgroupInfo pInfo = {0};
37,415✔
1358
    SName       pName = {0};
37,415✔
1359
    toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
37,415✔
1360
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
37,415✔
1361

1362
    pCreateReq->flags |= TD_CREATE_IF_NOT_EXISTS;
37,415✔
1363
    // change tag cid to new cid
1364
    if (pCreateReq->type == TSDB_CHILD_TABLE) {
37,415✔
1365
      STableMeta* pTableMeta = NULL;
32,570✔
1366
      SName       sName = {0};
32,570✔
1367
      tb_uid_t    oldSuid = pCreateReq->ctb.suid;
32,570✔
1368
      //      pCreateReq->ctb.suid = processSuid(pCreateReq->ctb.suid, pRequest->pDb);
1369
      toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName);
32,570✔
1370
      code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta);
32,570✔
1371
      if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
32,570✔
1372
        uInfo(LOG_ID_TAG " super table %s not exist, ignore create child table %s", LOG_ID_VALUE,
×
1373
              pCreateReq->ctb.stbName, pCreateReq->name);
1374
        code = TSDB_CODE_SUCCESS;
×
1375
        taosMemoryFreeClear(pTableMeta);
×
1376
        continue;
×
1377
      }
1378

1379
      RAW_RETURN_CHECK(code);
32,570✔
1380
      pCreateReq->ctb.suid = pTableMeta->uid;
32,570✔
1381

1382
      SArray* pTagVals = NULL;
32,570✔
1383
      code = tTagToValArray((STag*)pCreateReq->ctb.pTag, &pTagVals);
32,570✔
1384
      if (code != TSDB_CODE_SUCCESS) {
32,570✔
1385
        uError("create tb invalid tag data %s", pCreateReq->name);
×
1386
        taosMemoryFreeClear(pTableMeta);
×
1387
        goto end;
×
1388
      }
1389

1390
      bool rebuildTag = false;
32,570✔
1391
      for (int32_t i = 0; i < taosArrayGetSize(pCreateReq->ctb.tagName); i++) {
102,184✔
1392
        char* tName = taosArrayGet(pCreateReq->ctb.tagName, i);
69,614✔
1393
        if (tName == NULL) {
69,614✔
1394
          continue;
×
1395
        }
1396
        for (int32_t j = pTableMeta->tableInfo.numOfColumns;
69,614✔
1397
             j < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; j++) {
275,598✔
1398
          SSchema* tag = &pTableMeta->schema[j];
205,984✔
1399
          if (strcmp(tag->name, tName) == 0 && tag->type != TSDB_DATA_TYPE_JSON) {
205,984✔
1400
            STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
65,837✔
1401
            if (pTagVal) {
65,837✔
1402
              if (pTagVal->cid != tag->colId) {
65,837✔
1403
                pTagVal->cid = tag->colId;
5,662✔
1404
                rebuildTag = true;
5,662✔
1405
              }
1406
            } else {
1407
              uError("create tb invalid data %s, size:%d index:%d cid:%d", pCreateReq->name,
×
1408
                     (int)taosArrayGetSize(pTagVals), i, tag->colId);
1409
            }
1410
          }
1411
        }
1412
      }
1413
      taosMemoryFreeClear(pTableMeta);
32,570✔
1414
      if (rebuildTag) {
32,570✔
1415
        STag* ppTag = NULL;
3,526✔
1416
        code = tTagNew(pTagVals, 1, false, &ppTag);
3,526✔
1417
        taosArrayDestroy(pTagVals);
3,526✔
1418
        pTagVals = NULL;
3,526✔
1419
        if (code != TSDB_CODE_SUCCESS) {
3,526✔
1420
          goto end;
×
1421
        }
1422
        if (NULL == taosArrayPush(pTagList, &ppTag)) {
3,526✔
1423
          code = terrno;
×
1424
          tTagFree(ppTag);
×
1425
          goto end;
×
1426
        }
1427
        pCreateReq->ctb.pTag = (uint8_t*)ppTag;
3,526✔
1428
      }
1429
      taosArrayDestroy(pTagVals);
32,570✔
1430
    }
1431
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
74,830✔
1432

1433
    SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
37,415✔
1434
    if (pTableBatch == NULL) {
37,415✔
1435
      SVgroupCreateTableBatch tBatch = {0};
34,375✔
1436
      tBatch.info = pInfo;
34,375✔
1437
      tstrncpy(tBatch.dbName, pRequest->pDb, TSDB_DB_NAME_LEN);
34,375✔
1438

1439
      tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
34,375✔
1440
      RAW_NULL_CHECK(tBatch.req.pArray);
34,375✔
1441
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pCreateReq));
68,750✔
1442
      tBatch.req.source = TD_REQ_FROM_TAOX;
34,375✔
1443
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
34,375✔
1444
    } else {  // add to the correct vgroup
1445
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pCreateReq));
6,080✔
1446
    }
1447
  }
1448

1449
  if (taosHashGetSize(pVgroupHashmap) == 0) {
33,840✔
1450
    goto end;
×
1451
  }
1452
  SArray* pBufArray = NULL;
33,840✔
1453
  RAW_RETURN_CHECK(serializeVgroupsCreateTableBatch(pVgroupHashmap, &pBufArray));
33,840✔
1454
  pQuery = NULL;
33,840✔
1455
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
33,840✔
1456
  if (TSDB_CODE_SUCCESS != code) goto end;
33,840✔
1457
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
33,840✔
1458
  pQuery->msgType = TDMT_VND_CREATE_TABLE;
33,840✔
1459
  pQuery->stableQuery = false;
33,840✔
1460
  code = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT, &pQuery->pRoot);
33,840✔
1461
  if (TSDB_CODE_SUCCESS != code) goto end;
33,840✔
1462
  RAW_NULL_CHECK(pQuery->pRoot);
33,840✔
1463

1464
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
33,840✔
1465

1466
  launchQueryImpl(pRequest, pQuery, true, NULL);
33,840✔
1467
  if (pRequest->code == TSDB_CODE_SUCCESS) {
33,840✔
1468
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
33,840✔
1469
  }
1470

1471
  code = pRequest->code;
33,840✔
1472
  uDebug(LOG_ID_TAG " create table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
33,840✔
1473

1474
end:
33,840✔
1475
  tDeleteSVCreateTbBatchReq(&req);
33,840✔
1476

1477
  taosHashCleanup(pVgroupHashmap);
33,840✔
1478
  destroyRequest(pRequest);
33,840✔
1479
  tDecoderClear(&coder);
33,840✔
1480
  qDestroyQuery(pQuery);
33,840✔
1481
  taosArrayDestroyP(pTagList, NULL);
33,840✔
1482
  RAW_LOG_END
33,840✔
1483
  return code;
33,840✔
1484
}
1485

1486
typedef struct SVgroupDropTableBatch {
1487
  SVDropTbBatchReq req;
1488
  SVgroupInfo      info;
1489
  char             dbName[TSDB_DB_NAME_LEN];
1490
} SVgroupDropTableBatch;
1491

1492
static void destroyDropTbReqBatch(void* data) {
825✔
1493
  if (data == NULL) {
825✔
1494
    uError("invalid parameter in %s", __func__);
×
1495
    return;
×
1496
  }
1497
  SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
825✔
1498
  taosArrayDestroy(pTbBatch->req.pArray);
825✔
1499
}
1500

1501
static int32_t taosDropTable(TAOS* taos, void* meta, uint32_t metaLen) {
1,359✔
1502
  if (taos == NULL || meta == NULL) {
1,359✔
1503
    uError("invalid parameter in %s", __func__);
×
1504
    return TSDB_CODE_INVALID_PARA;
×
1505
  }
1506
  SVDropTbBatchReq req = {0};
1,359✔
1507
  SDecoder         coder = {0};
1,359✔
1508
  int32_t          code = TSDB_CODE_SUCCESS;
1,359✔
1509
  int32_t          lino = 0;
1,359✔
1510
  SRequestObj*     pRequest = NULL;
1,359✔
1511
  SQuery*          pQuery = NULL;
1,359✔
1512
  SHashObj*        pVgroupHashmap = NULL;
1,359✔
1513

1514
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
1,359✔
1515
  uDebug(LOG_ID_TAG " drop table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
1,359✔
1516

1517
  pRequest->syncQuery = true;
1,359✔
1518
  if (!pRequest->pDb) {
1,359✔
1519
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
1520
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1521
    goto end;
×
1522
  }
1523
  // decode and process req
1524
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
1,359✔
1525
  uint32_t len = metaLen - sizeof(SMsgHead);
1,359✔
1526
  tDecoderInit(&coder, data, len);
1,359✔
1527
  RAW_RETURN_CHECK(tDecodeSVDropTbBatchReq(&coder, &req));
1,359✔
1528
  STscObj* pTscObj = pRequest->pTscObj;
1,359✔
1529

1530
  SVDropTbReq* pDropReq = NULL;
1,359✔
1531
  SCatalog*    pCatalog = NULL;
1,359✔
1532
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
1,359✔
1533

1534
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
1,359✔
1535
  RAW_NULL_CHECK(pVgroupHashmap);
1,359✔
1536
  taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
1,359✔
1537

1538
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
1,359✔
1539
                           .requestId = pRequest->requestId,
1,359✔
1540
                           .requestObjRefId = pRequest->self,
1,359✔
1541
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
1,359✔
1542
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
1,359✔
1543
  RAW_NULL_CHECK(pRequest->tableList);
1,359✔
1544
  // loop to create table
1545
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
2,997✔
1546
    pDropReq = req.pReqs + iReq;
1,638✔
1547
    pDropReq->igNotExists = true;
1,638✔
1548
    //    pDropReq->suid = processSuid(pDropReq->suid, pRequest->pDb);
1549

1550
    SVgroupInfo pInfo = {0};
1,638✔
1551
    SName       pName = {0};
1,638✔
1552
    toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
1,638✔
1553
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
1,638✔
1554

1555
    STableMeta* pTableMeta = NULL;
1,638✔
1556
    code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
1,638✔
1557
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
1,638✔
1558
      code = TSDB_CODE_SUCCESS;
534✔
1559
      uInfo(LOG_ID_TAG " table %s not exist, ignore drop", LOG_ID_VALUE, pDropReq->name);
534✔
1560
      taosMemoryFreeClear(pTableMeta);
534✔
1561
      continue;
534✔
1562
    }
1563
    RAW_RETURN_CHECK(code);
1,104✔
1564
    tb_uid_t oldSuid = pDropReq->suid;
1,104✔
1565
    pDropReq->suid = pTableMeta->suid;
1,104✔
1566
    taosMemoryFreeClear(pTableMeta);
1,104✔
1567
    uDebug(LOG_ID_TAG " drop table name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, pDropReq->name, oldSuid,
1,104✔
1568
           pDropReq->suid);
1569

1570
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
2,208✔
1571
    SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
1,104✔
1572
    if (pTableBatch == NULL) {
1,104✔
1573
      SVgroupDropTableBatch tBatch = {0};
825✔
1574
      tBatch.info = pInfo;
825✔
1575
      tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
825✔
1576
      RAW_NULL_CHECK(tBatch.req.pArray);
825✔
1577
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pDropReq));
1,650✔
1578
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
825✔
1579
    } else {  // add to the correct vgroup
1580
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pDropReq));
558✔
1581
    }
1582
  }
1583

1584
  if (taosHashGetSize(pVgroupHashmap) == 0) {
1,359✔
1585
    goto end;
534✔
1586
  }
1587
  SArray* pBufArray = NULL;
825✔
1588
  RAW_RETURN_CHECK(serializeVgroupsDropTableBatch(pVgroupHashmap, &pBufArray));
825✔
1589
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
825✔
1590
  if (TSDB_CODE_SUCCESS != code) goto end;
825✔
1591
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
825✔
1592
  pQuery->msgType = TDMT_VND_DROP_TABLE;
825✔
1593
  pQuery->stableQuery = false;
825✔
1594
  pQuery->pRoot = NULL;
825✔
1595
  code = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT, &pQuery->pRoot);
825✔
1596
  if (TSDB_CODE_SUCCESS != code) goto end;
825✔
1597
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
825✔
1598

1599
  launchQueryImpl(pRequest, pQuery, true, NULL);
825✔
1600
  if (pRequest->code == TSDB_CODE_SUCCESS) {
825✔
1601
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
825✔
1602
  }
1603
  code = pRequest->code;
825✔
1604
  uDebug(LOG_ID_TAG " drop table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
825✔
1605

1606
end:
1,359✔
1607
  taosHashCleanup(pVgroupHashmap);
1,359✔
1608
  destroyRequest(pRequest);
1,359✔
1609
  tDecoderClear(&coder);
1,359✔
1610
  qDestroyQuery(pQuery);
1,359✔
1611
  RAW_LOG_END
1,359✔
1612
  return code;
1,359✔
1613
}
1614

1615
static int32_t taosDeleteData(TAOS* taos, void* meta, uint32_t metaLen) {
812✔
1616
  if (taos == NULL || meta == NULL) {
812✔
1617
    uError("invalid parameter in %s", __func__);
×
1618
    return TSDB_CODE_INVALID_PARA;
×
1619
  }
1620
  SDeleteRes req = {0};
812✔
1621
  SDecoder   coder = {0};
812✔
1622
  char       sql[256] = {0};
812✔
1623
  int32_t    code = TSDB_CODE_SUCCESS;
812✔
1624
  int32_t    lino = 0;
812✔
1625
  uDebug("connId:0x%" PRIx64 " delete data, meta:%p, len:%d", *(int64_t*)taos, meta, metaLen);
812✔
1626

1627
  // decode and process req
1628
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
812✔
1629
  uint32_t len = metaLen - sizeof(SMsgHead);
812✔
1630
  tDecoderInit(&coder, data, len);
812✔
1631
  RAW_RETURN_CHECK(tDecodeDeleteRes(&coder, &req));
812✔
1632
  (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
812✔
1633
                 req.tsColName, req.skey, req.tsColName, req.ekey);
1634

1635
  TAOS_RES* res = taosQueryImpl(taos, sql, false, TD_REQ_FROM_TAOX);
812✔
1636
  RAW_NULL_CHECK(res);
812✔
1637
  SRequestObj* pRequest = (SRequestObj*)res;
812✔
1638
  code = pRequest->code;
812✔
1639
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_PAR_GET_META_ERROR) {
812✔
1640
    code = TSDB_CODE_SUCCESS;
267✔
1641
  }
1642
  taos_free_result(res);
812✔
1643
  uDebug("connId:0x%" PRIx64 " delete data sql:%s, code:%s", *(int64_t*)taos, sql, tstrerror(code));
812✔
1644

1645
end:
812✔
1646
  RAW_LOG_END
812✔
1647
  tDecoderClear(&coder);
812✔
1648
  return code;
812✔
1649
}
1650

1651
static int32_t taosAlterTable(TAOS* taos, void* meta, uint32_t metaLen) {
6,546✔
1652
  if (taos == NULL || meta == NULL) {
6,546✔
1653
    uError("invalid parameter in %s", __func__);
×
1654
    return TSDB_CODE_INVALID_PARA;
×
1655
  }
1656
  SVAlterTbReq   req = {0};
6,546✔
1657
  SDecoder       dcoder = {0};
6,546✔
1658
  int32_t        code = TSDB_CODE_SUCCESS;
6,546✔
1659
  int32_t        lino = 0;
6,546✔
1660
  SRequestObj*   pRequest = NULL;
6,546✔
1661
  SQuery*        pQuery = NULL;
6,546✔
1662
  SArray*        pArray = NULL;
6,546✔
1663
  SVgDataBlocks* pVgData = NULL;
6,546✔
1664
  SEncoder       coder = {0};
6,546✔
1665

1666
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
6,546✔
1667
  uDebug(LOG_ID_TAG " alter table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
6,546✔
1668
  pRequest->syncQuery = true;
6,546✔
1669
  if (!pRequest->pDb) {
6,546✔
1670
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
1671
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1672
    goto end;
×
1673
  }
1674
  // decode and process req
1675
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
6,546✔
1676
  uint32_t len = metaLen - sizeof(SMsgHead);
6,546✔
1677
  tDecoderInit(&dcoder, data, len);
6,546✔
1678
  RAW_RETURN_CHECK(tDecodeSVAlterTbReq(&dcoder, &req));
6,546✔
1679
  // do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
1680
  if (req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS) {
6,546✔
1681
    uInfo(LOG_ID_TAG " alter table action is UPDATE_OPTIONS, ignore", LOG_ID_VALUE);
1,091✔
1682
    goto end;
1,091✔
1683
  }
1684

1685
  STscObj*  pTscObj = pRequest->pTscObj;
5,455✔
1686
  SCatalog* pCatalog = NULL;
5,455✔
1687
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
5,455✔
1688
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
5,455✔
1689
                           .requestId = pRequest->requestId,
5,455✔
1690
                           .requestObjRefId = pRequest->self,
5,455✔
1691
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
5,455✔
1692

1693
  SVgroupInfo pInfo = {0};
5,455✔
1694
  SName       pName = {0};
5,455✔
1695
  toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
5,455✔
1696
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
5,455✔
1697
  pArray = taosArrayInit(1, sizeof(void*));
5,455✔
1698
  RAW_NULL_CHECK(pArray);
5,455✔
1699

1700
  pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
5,455✔
1701
  RAW_NULL_CHECK(pVgData);
5,455✔
1702
  pVgData->vg = pInfo;
5,455✔
1703

1704
  int tlen = 0;
5,455✔
1705
  req.source = TD_REQ_FROM_TAOX;
5,455✔
1706
  tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code);
5,455✔
1707
  RAW_RETURN_CHECK(code);
5,455✔
1708
  tlen += sizeof(SMsgHead);
5,455✔
1709
  void* pMsg = taosMemoryMalloc(tlen);
5,455✔
1710
  RAW_NULL_CHECK(pMsg);
5,455✔
1711
  ((SMsgHead*)pMsg)->vgId = htonl(pInfo.vgId);
5,455✔
1712
  ((SMsgHead*)pMsg)->contLen = htonl(tlen);
5,455✔
1713
  void* pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
5,455✔
1714
  tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
5,455✔
1715
  code = tEncodeSVAlterTbReq(&coder, &req);
5,455✔
1716
  RAW_RETURN_CHECK(code);
5,455✔
1717

1718
  pVgData->pData = pMsg;
5,455✔
1719
  pVgData->size = tlen;
5,455✔
1720

1721
  pVgData->numOfTables = 1;
5,455✔
1722
  RAW_NULL_CHECK(taosArrayPush(pArray, &pVgData));
5,455✔
1723

1724
  pQuery = NULL;
5,455✔
1725
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
5,455✔
1726
  if (NULL == pQuery) goto end;
5,455✔
1727
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
5,455✔
1728
  pQuery->msgType = TDMT_VND_ALTER_TABLE;
5,455✔
1729
  pQuery->stableQuery = false;
5,455✔
1730
  pQuery->pRoot = NULL;
5,455✔
1731
  code = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT, &pQuery->pRoot);
5,455✔
1732
  if (TSDB_CODE_SUCCESS != code) goto end;
5,455✔
1733
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pArray));
5,455✔
1734

1735
  launchQueryImpl(pRequest, pQuery, true, NULL);
5,455✔
1736

1737
  pVgData = NULL;
5,455✔
1738
  pArray = NULL;
5,455✔
1739
  code = pRequest->code;
5,455✔
1740
  if (code == TSDB_CODE_TDB_TABLE_NOT_EXIST) {
5,455✔
1741
    code = TSDB_CODE_SUCCESS;
267✔
1742
  }
1743

1744
  if (pRequest->code == TSDB_CODE_SUCCESS) {
5,455✔
1745
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
5,188✔
1746
    if (pRes->res != NULL) {
5,188✔
1747
      code = handleAlterTbExecRes(pRes->res, pCatalog);
4,364✔
1748
    }
1749
  }
1750
  uDebug(LOG_ID_TAG " alter table return, meta:%p, len:%d, msg:%s", LOG_ID_VALUE, meta, metaLen, tstrerror(code));
5,455✔
1751

1752
end:
6,546✔
1753
  taosArrayDestroy(pArray);
6,546✔
1754
  if (pVgData) taosMemoryFreeClear(pVgData->pData);
6,546✔
1755
  taosMemoryFreeClear(pVgData);
6,546✔
1756
  destroyRequest(pRequest);
6,546✔
1757
  tDecoderClear(&dcoder);
6,546✔
1758
  qDestroyQuery(pQuery);
6,546✔
1759
  taosArrayDestroy(req.pMultiTag);
6,546✔
1760
  tEncoderClear(&coder);
6,546✔
1761
  RAW_LOG_END
6,546✔
1762
  return code;
6,546✔
1763
}
1764

1765
int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD* fields,
266✔
1766
                                     int numFields) {
1767
  return taos_write_raw_block_with_fields_with_reqid(taos, rows, pData, tbname, fields, numFields, 0);
266✔
1768
}
1769

1770
int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname,
266✔
1771
                                                TAOS_FIELD* fields, int numFields, int64_t reqid) {
1772
  if (taos == NULL || pData == NULL || tbname == NULL) {
266✔
1773
    uError("invalid parameter in %s", __func__);
×
1774
    return TSDB_CODE_INVALID_PARA;
×
1775
  }
1776
  int32_t     code = TSDB_CODE_SUCCESS;
266✔
1777
  int32_t     lino = 0;
266✔
1778
  STableMeta* pTableMeta = NULL;
266✔
1779
  SQuery*     pQuery = NULL;
266✔
1780
  SHashObj*   pVgHash = NULL;
266✔
1781

1782
  SRequestObj* pRequest = NULL;
266✔
1783
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
266✔
1784

1785
  uDebug(LOG_ID_TAG " write raw block with field, rows:%d, pData:%p, tbname:%s, fields:%p, numFields:%d", LOG_ID_VALUE,
266✔
1786
         rows, pData, tbname, fields, numFields);
1787

1788
  pRequest->syncQuery = true;
266✔
1789
  if (!pRequest->pDb) {
266✔
1790
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
1791
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1792
    goto end;
×
1793
  }
1794

1795
  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
266✔
1796
  tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
266✔
1797
  tstrncpy(pName.tname, tbname, sizeof(pName.tname));
266✔
1798

1799
  struct SCatalog* pCatalog = NULL;
266✔
1800
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
266✔
1801

1802
  SRequestConnInfo conn = {0};
266✔
1803
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
266✔
1804
  conn.requestId = pRequest->requestId;
266✔
1805
  conn.requestObjRefId = pRequest->self;
266✔
1806
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
266✔
1807

1808
  SVgroupInfo vgData = {0};
266✔
1809
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData));
266✔
1810
  RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
266✔
1811
  RAW_RETURN_CHECK(smlInitHandle(&pQuery));
266✔
1812
  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
266✔
1813
  RAW_NULL_CHECK(pVgHash);
266✔
1814
  RAW_RETURN_CHECK(
266✔
1815
      taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
1816
  RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0, false));
266✔
1817
  RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
266✔
1818

1819
  launchQueryImpl(pRequest, pQuery, true, NULL);
266✔
1820
  code = pRequest->code;
266✔
1821
  uDebug(LOG_ID_TAG " write raw block with field return, msg:%s", LOG_ID_VALUE, tstrerror(code));
266✔
1822

1823
end:
266✔
1824
  taosMemoryFreeClear(pTableMeta);
266✔
1825
  qDestroyQuery(pQuery);
266✔
1826
  destroyRequest(pRequest);
266✔
1827
  taosHashCleanup(pVgHash);
266✔
1828
  RAW_LOG_END
266✔
1829
  return code;
266✔
1830
}
1831

1832
int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) {
1,862✔
1833
  return taos_write_raw_block_with_reqid(taos, rows, pData, tbname, 0);
1,862✔
1834
}
1835

1836
int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname, int64_t reqid) {
1,862✔
1837
  if (taos == NULL || pData == NULL || tbname == NULL) {
1,862✔
1838
    return TSDB_CODE_INVALID_PARA;
×
1839
  }
1840
  int32_t     code = TSDB_CODE_SUCCESS;
1,862✔
1841
  int32_t     lino = 0;
1,862✔
1842
  STableMeta* pTableMeta = NULL;
1,862✔
1843
  SQuery*     pQuery = NULL;
1,862✔
1844
  SHashObj*   pVgHash = NULL;
1,862✔
1845

1846
  SRequestObj* pRequest = NULL;
1,862✔
1847
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
1,862✔
1848

1849
  uDebug(LOG_ID_TAG " write raw block, rows:%d, pData:%p, tbname:%s", LOG_ID_VALUE, rows, pData, tbname);
1,862✔
1850

1851
  pRequest->syncQuery = true;
1,862✔
1852
  if (!pRequest->pDb) {
1,862✔
1853
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
1854
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1855
    goto end;
×
1856
  }
1857

1858
  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
1,862✔
1859
  tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
1,862✔
1860
  tstrncpy(pName.tname, tbname, sizeof(pName.tname));
1,862✔
1861

1862
  struct SCatalog* pCatalog = NULL;
1,862✔
1863
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
1,862✔
1864

1865
  SRequestConnInfo conn = {0};
1,862✔
1866
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
1,862✔
1867
  conn.requestId = pRequest->requestId;
1,862✔
1868
  conn.requestObjRefId = pRequest->self;
1,862✔
1869
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
1,862✔
1870

1871
  SVgroupInfo vgData = {0};
1,862✔
1872
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData));
1,862✔
1873
  RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
1,862✔
1874
  RAW_RETURN_CHECK(smlInitHandle(&pQuery));
1,596✔
1875
  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
1,596✔
1876
  RAW_NULL_CHECK(pVgHash);
1,596✔
1877
  RAW_RETURN_CHECK(
1,596✔
1878
      taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
1879
  RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false, NULL, 0, false));
1,596✔
1880
  RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
1,064✔
1881

1882
  launchQueryImpl(pRequest, pQuery, true, NULL);
1,064✔
1883
  code = pRequest->code;
1,064✔
1884
  uDebug(LOG_ID_TAG " write raw block return, msg:%s", LOG_ID_VALUE, tstrerror(code));
1,064✔
1885

1886
end:
1,862✔
1887
  taosMemoryFreeClear(pTableMeta);
1,862✔
1888
  qDestroyQuery(pQuery);
1,862✔
1889
  destroyRequest(pRequest);
1,862✔
1890
  taosHashCleanup(pVgHash);
1,862✔
1891
  RAW_LOG_END
1,862✔
1892
  return code;
1,862✔
1893
}
1894

1895
static void* getRawDataFromRes(void* pRetrieve) {
32,281✔
1896
  if (pRetrieve == NULL) {
32,281✔
1897
    uError("invalid parameter in %s", __func__);
×
1898
    return NULL;
×
1899
  }
1900
  void* rawData = NULL;
32,281✔
1901
  // deal with compatibility
1902
  if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_VERSION) {
32,281✔
1903
    rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
×
1904
  } else if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_VERSION ||
32,281✔
1905
             *(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION) {
×
1906
    rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
32,281✔
1907
  }
1908
  return rawData;
32,281✔
1909
}
1910

1911
static int32_t buildCreateTbMap(SMqDataRsp* rsp, SHashObj* pHashObj) {
1,347✔
1912
  if (rsp == NULL || pHashObj == NULL) {
1,347✔
1913
    uError("invalid parameter in %s", __func__);
×
1914
    return TSDB_CODE_INVALID_PARA;
×
1915
  }
1916
  // find schema data info
1917
  int32_t       code = 0;
1,347✔
1918
  int32_t       lino = 0;
1,347✔
1919
  SVCreateTbReq pCreateReq = {0};
1,347✔
1920
  SDecoder      decoderTmp = {0};
1,347✔
1921
  RAW_LOG_START
1,347✔
1922
  for (int j = 0; j < rsp->createTableNum; j++) {
4,595✔
1923
    void** dataTmp = taosArrayGet(rsp->createTableReq, j);
3,248✔
1924
    RAW_NULL_CHECK(dataTmp);
3,248✔
1925
    int32_t* lenTmp = taosArrayGet(rsp->createTableLen, j);
3,248✔
1926
    RAW_NULL_CHECK(lenTmp);
3,248✔
1927

1928
    tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
3,248✔
1929
    RAW_RETURN_CHECK(tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq));
3,248✔
1930

1931
    if (pCreateReq.type != TSDB_CHILD_TABLE) {
3,248✔
1932
      uError("invalid table type %d in %s", pCreateReq.type, __func__);
×
1933
      code = TSDB_CODE_INVALID_MSG;
×
1934
      goto end;
×
1935
    }
1936
    if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL) {
3,248✔
1937
      RAW_RETURN_CHECK(
3,248✔
1938
          taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq)));
1939
    } else {
1940
      tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
×
1941
    }
1942

1943
    tDecoderClear(&decoderTmp);
3,248✔
1944
    pCreateReq = (SVCreateTbReq){0};
3,248✔
1945
  }
1946

1947
end:
1,347✔
1948
  tDecoderClear(&decoderTmp);
1,347✔
1949
  tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
1,347✔
1950
  RAW_LOG_END
1,347✔
1951
  return code;
1,347✔
1952
}
1953

1954
typedef enum {
1955
  WRITE_RAW_INIT_START = 0,
1956
  WRITE_RAW_INIT_OK,
1957
  WRITE_RAW_INIT_FAIL,
1958
} WRITE_RAW_INIT_STATUS;
1959

1960
static SHashObj* writeRawCache = NULL;
1961
static int8_t    initFlag = 0;
1962
static int8_t    initedFlag = WRITE_RAW_INIT_START;
1963

1964
typedef struct {
1965
  SHashObj* pVgHash;
1966
  SHashObj* pNameHash;
1967
  SHashObj* pMetaHash;
1968
} rawCacheInfo;
1969

1970
typedef struct {
1971
  SVgroupInfo vgInfo;
1972
  int64_t     uid;
1973
  int64_t     suid;
1974
} tbInfo;
1975

1976
static void tmqFreeMeta(void* data) {
9,684✔
1977
  if (data == NULL) {
9,684✔
1978
    uError("invalid parameter in %s", __func__);
×
1979
    return;
×
1980
  }
1981
  STableMeta* pTableMeta = *(STableMeta**)data;
9,684✔
1982
  taosMemoryFree(pTableMeta);
9,684✔
1983
}
1984

1985
static void freeRawCache(void* data) {
×
1986
  if (data == NULL) {
×
1987
    uError("invalid parameter in %s", __func__);
×
1988
    return;
×
1989
  }
1990
  rawCacheInfo* pRawCache = (rawCacheInfo*)data;
×
1991
  taosHashCleanup(pRawCache->pMetaHash);
×
1992
  taosHashCleanup(pRawCache->pNameHash);
×
1993
  taosHashCleanup(pRawCache->pVgHash);
×
1994
}
1995

1996
static int32_t initRawCacheHash() {
3,008✔
1997
  if (writeRawCache == NULL) {
3,008✔
1998
    writeRawCache = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
3,008✔
1999
    if (writeRawCache == NULL) {
3,008✔
2000
      return terrno;
×
2001
    }
2002
    taosHashSetFreeFp(writeRawCache, freeRawCache);
3,008✔
2003
  }
2004
  return 0;
3,008✔
2005
}
2006

2007
static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW) {
3,517✔
2008
  if (rawData == NULL || pSW == NULL) {
3,517✔
2009
    return false;
×
2010
  }
2011
  if (pTableMeta == NULL) {
3,517✔
2012
    uError("invalid parameter in %s", __func__);
×
2013
    return false;
×
2014
  }
2015
  char* p = (char*)rawData;
3,517✔
2016
  // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
2017
  // column length |
2018
  p += sizeof(int32_t);
3,517✔
2019
  p += sizeof(int32_t);
3,517✔
2020
  p += sizeof(int32_t);
3,517✔
2021
  p += sizeof(int32_t);
3,517✔
2022
  p += sizeof(int32_t);
3,517✔
2023
  p += sizeof(uint64_t);
3,517✔
2024
  int8_t* fields = p;
3,517✔
2025

2026
  if (pSW->nCols != pTableMeta->tableInfo.numOfColumns) {
3,517✔
2027
    return true;
1,080✔
2028
  }
2029

2030
  for (int i = 0; i < pSW->nCols; i++) {
12,730✔
2031
    int j = 0;
10,293✔
2032
    for (; j < pTableMeta->tableInfo.numOfColumns; j++) {
27,095✔
2033
      SSchema*    pColSchema = &pTableMeta->schema[j];
27,095✔
2034
      SSchemaExt* pColExtSchema = &pTableMeta->schemaExt[j];
27,095✔
2035
      char*       fieldName = pSW->pSchema[i].name;
27,095✔
2036

2037
      if (strcmp(pColSchema->name, fieldName) == 0) {
27,095✔
2038
        if (checkSchema(pColSchema, pColExtSchema, fields, NULL, 0) != 0) {
10,293✔
2039
          return true;
×
2040
        }
2041
        break;
10,293✔
2042
      }
2043
    }
2044
    fields += sizeof(int8_t) + sizeof(int32_t);
10,293✔
2045

2046
    if (j == pTableMeta->tableInfo.numOfColumns) return true;
10,293✔
2047
  }
2048
  return false;
2,437✔
2049
}
2050

2051
static int32_t getRawCache(SHashObj** pVgHash, SHashObj** pNameHash, SHashObj** pMetaHash, void* key) {
10,840✔
2052
  if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || key == NULL) {
10,840✔
2053
    uError("invalid parameter in %s", __func__);
×
2054
    return TSDB_CODE_INVALID_PARA;
×
2055
  }
2056
  int32_t code = 0;
10,840✔
2057
  int32_t lino = 0;
10,840✔
2058
  RAW_LOG_START
10,840✔
2059
  void* cacheInfo = taosHashGet(writeRawCache, &key, POINTER_BYTES);
10,840✔
2060
  if (cacheInfo == NULL) {
10,840✔
2061
    *pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
10,840✔
2062
    RAW_NULL_CHECK(*pVgHash);
10,840✔
2063
    *pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
10,840✔
2064
    RAW_NULL_CHECK(*pNameHash);
10,840✔
2065
    *pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
10,840✔
2066
    RAW_NULL_CHECK(*pMetaHash);
10,840✔
2067
    taosHashSetFreeFp(*pMetaHash, tmqFreeMeta);
10,840✔
2068
    rawCacheInfo info = {*pVgHash, *pNameHash, *pMetaHash};
10,840✔
2069
    RAW_RETURN_CHECK(taosHashPut(writeRawCache, &key, POINTER_BYTES, &info, sizeof(rawCacheInfo)));
10,840✔
2070
  } else {
2071
    rawCacheInfo* info = (rawCacheInfo*)cacheInfo;
×
2072
    *pVgHash = info->pVgHash;
×
2073
    *pNameHash = info->pNameHash;
×
2074
    *pMetaHash = info->pMetaHash;
×
2075
  }
2076

2077
end:
10,840✔
2078
  if (code != 0) {
10,840✔
2079
    taosHashCleanup(*pMetaHash);
×
2080
    taosHashCleanup(*pNameHash);
×
2081
    taosHashCleanup(*pVgHash);
×
2082
  }
2083
  RAW_LOG_END
10,840✔
2084
  return code;
10,840✔
2085
}
2086

2087
static int32_t buildRawRequest(TAOS* taos, SRequestObj** pRequest, SCatalog** pCatalog, SRequestConnInfo* conn) {
10,840✔
2088
  if (taos == NULL || pRequest == NULL || pCatalog == NULL || conn == NULL) {
10,840✔
2089
    uError("invalid parameter in %s", __func__);
×
2090
    return TSDB_CODE_INVALID_PARA;
×
2091
  }
2092
  int32_t code = 0;
10,840✔
2093
  int32_t lino = 0;
10,840✔
2094
  RAW_LOG_START
10,840✔
2095
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, pRequest, 0));
10,840✔
2096
  (*pRequest)->syncQuery = true;
10,840✔
2097
  if (!(*pRequest)->pDb) {
10,840✔
2098
    uError("%s no database selected", __func__);
×
2099
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
2100
    goto end;
×
2101
  }
2102

2103
  RAW_RETURN_CHECK(catalogGetHandle((*pRequest)->pTscObj->pAppInfo->clusterId, pCatalog));
10,840✔
2104
  conn->pTrans = (*pRequest)->pTscObj->pAppInfo->pTransporter;
10,840✔
2105
  conn->requestId = (*pRequest)->requestId;
10,840✔
2106
  conn->requestObjRefId = (*pRequest)->self;
10,840✔
2107
  conn->mgmtEps = getEpSet_s(&(*pRequest)->pTscObj->pAppInfo->mgmtEp);
10,840✔
2108

2109
end:
10,840✔
2110
  RAW_LOG_END
10,840✔
2111
  return code;
10,840✔
2112
}
2113

2114
typedef int32_t _raw_decode_func_(SDecoder* pDecoder, SMqDataRsp* pRsp);
2115
static int32_t  decodeRawData(SDecoder* decoder, void* data, uint32_t dataLen, _raw_decode_func_ func,
10,840✔
2116
                              SMqRspObj* rspObj) {
2117
  if (decoder == NULL || data == NULL || func == NULL || rspObj == NULL) {
10,840✔
2118
    uError("invalid parameter in %s", __func__);
×
2119
    return TSDB_CODE_INVALID_PARA;
×
2120
  }
2121
  int8_t dataVersion = *(int8_t*)data;
10,840✔
2122
  if (dataVersion >= MQ_DATA_RSP_VERSION) {
10,840✔
2123
    data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
10,840✔
2124
    if (dataLen < sizeof(int8_t) + sizeof(int32_t)) {
10,840✔
2125
      return TSDB_CODE_INVALID_PARA;
×
2126
    }
2127
    dataLen -= sizeof(int8_t) + sizeof(int32_t);
10,840✔
2128
  }
2129

2130
  rspObj->resIter = -1;
10,840✔
2131
  tDecoderInit(decoder, data, dataLen);
10,840✔
2132
  int32_t code = func(decoder, &rspObj->dataRsp);
10,840✔
2133
  if (code != 0) {
10,840✔
2134
    SET_ERROR_MSG("decode mq taosx data rsp failed");
×
2135
  }
2136
  return code;
10,840✔
2137
}
2138

2139
static int32_t processCacheMeta(SHashObj* pVgHash, SHashObj* pNameHash, SHashObj* pMetaHash,
32,281✔
2140
                                SVCreateTbReq* pCreateReqDst, SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName,
2141
                                STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry) {
2142
  if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || pCatalog == NULL || conn == NULL || pName == NULL ||
32,281✔
2143
      pMeta == NULL) {
2144
    uError("invalid parameter in %s", __func__);
×
2145
    return TSDB_CODE_INVALID_PARA;
×
2146
  }
2147
  int32_t code = 0;
32,281✔
2148
  int32_t lino = 0;
32,281✔
2149
  RAW_LOG_START
32,281✔
2150
  STableMeta* pTableMeta = NULL;
32,281✔
2151
  tbInfo*     tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
32,281✔
2152
  if (tmpInfo == NULL || retry > 0) {
32,281✔
2153
    tbInfo info = {0};
28,764✔
2154

2155
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, conn, pName, &info.vgInfo));
28,764✔
2156
    if (pCreateReqDst && tmpInfo == NULL) {  // change stable name to get meta
28,764✔
2157
      tstrncpy(pName->tname, pCreateReqDst->ctb.stbName, TSDB_TABLE_NAME_LEN);
3,248✔
2158
    }
2159
    RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
28,764✔
2160
    info.uid = pTableMeta->uid;
28,764✔
2161
    if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
28,764✔
2162
      info.suid = pTableMeta->suid;
21,496✔
2163
    } else {
2164
      info.suid = pTableMeta->uid;
7,268✔
2165
    }
2166
    code = taosHashPut(pMetaHash, &info.suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
28,764✔
2167
    RAW_RETURN_CHECK(code);
28,764✔
2168

2169
    uDebug("put table meta to hash1, suid:%" PRId64 ", metaHashSIze:%d, nameHashSize:%d, vgHashSize:%d", info.suid,
28,764✔
2170
           taosHashGetSize(pMetaHash), taosHashGetSize(pNameHash), taosHashGetSize(pVgHash));
2171
    if (pCreateReqDst) {
28,764✔
2172
      pTableMeta->vgId = info.vgInfo.vgId;
3,248✔
2173
      pTableMeta->uid = pCreateReqDst->uid;
3,248✔
2174
      pCreateReqDst->ctb.suid = pTableMeta->suid;
3,248✔
2175
    }
2176

2177
    RAW_RETURN_CHECK(taosHashPut(pNameHash, pName->tname, strlen(pName->tname), &info, sizeof(tbInfo)));
28,764✔
2178
    tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
28,764✔
2179
    RAW_RETURN_CHECK(
28,764✔
2180
        taosHashPut(pVgHash, &info.vgInfo.vgId, sizeof(info.vgInfo.vgId), &info.vgInfo, sizeof(SVgroupInfo)));
2181
  }
2182

2183
  if (pTableMeta == NULL || retry > 0) {
32,281✔
2184
    STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, &tmpInfo->suid, LONG_BYTES);
3,517✔
2185
    if (pTableMetaTmp == NULL || retry > 0 || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) {
3,517✔
2186
      RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
1,080✔
2187
      code = taosHashPut(pMetaHash, &tmpInfo->suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
1,080✔
2188
      RAW_RETURN_CHECK(code);
1,080✔
2189
      uDebug("put table meta to hash2, suid:%" PRId64 ", metaHashSIze:%d, nameHashSize:%d, vgHashSize:%d",
1,080✔
2190
             tmpInfo->suid, taosHashGetSize(pMetaHash), taosHashGetSize(pNameHash), taosHashGetSize(pVgHash));
2191
    } else {
2192
      pTableMeta = *pTableMetaTmp;
2,437✔
2193
      pTableMeta->uid = tmpInfo->uid;
2,437✔
2194
      pTableMeta->vgId = tmpInfo->vgInfo.vgId;
2,437✔
2195
    }
2196
  }
2197
  *pMeta = pTableMeta;
32,281✔
2198
  pTableMeta = NULL;
32,281✔
2199

2200
end:
32,281✔
2201
  taosMemoryFree(pTableMeta);
32,281✔
2202
  RAW_LOG_END
32,281✔
2203
  return code;
32,281✔
2204
}
2205

2206
static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
9,493✔
2207
  if (taos == NULL || data == NULL) {
9,493✔
2208
    uError("invalid parameter in %s", __func__);
×
2209
    return TSDB_CODE_INVALID_PARA;
×
2210
  }
2211
  int32_t   code = TSDB_CODE_SUCCESS;
9,493✔
2212
  int32_t   lino = 0;
9,493✔
2213
  SQuery*   pQuery = NULL;
9,493✔
2214
  SMqRspObj rspObj = {0};
9,493✔
2215
  SDecoder  decoder = {0};
9,493✔
2216

2217
  SRequestObj*     pRequest = NULL;
9,493✔
2218
  SCatalog*        pCatalog = NULL;
9,493✔
2219
  SRequestConnInfo conn = {0};
9,493✔
2220
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
9,493✔
2221
  uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
9,493✔
2222
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
9,493✔
2223

2224
  SHashObj* pVgHash = NULL;
9,493✔
2225
  SHashObj* pNameHash = NULL;
9,493✔
2226
  SHashObj* pMetaHash = NULL;
9,493✔
2227
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
9,493✔
2228
  int retry = 0;
9,493✔
2229
  while (1) {
2230
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
9,493✔
2231
    uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
9,493✔
2232
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
36,090✔
2233
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
26,597✔
2234
      RAW_NULL_CHECK(tbName);
26,597✔
2235
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
26,597✔
2236
      RAW_NULL_CHECK(pSW);
26,597✔
2237
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
26,597✔
2238
      RAW_NULL_CHECK(pRetrieve);
26,597✔
2239
      void* rawData = getRawDataFromRes(pRetrieve);
26,597✔
2240
      RAW_NULL_CHECK(rawData);
26,597✔
2241

2242
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
26,597✔
2243
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
26,597✔
2244
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
26,597✔
2245
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
26,597✔
2246

2247
      STableMeta* pTableMeta = NULL;
26,597✔
2248
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, &pTableMeta, pSW,
26,597✔
2249
                                        rawData, retry));
2250
      char err[ERR_MSG_LEN] = {0};
26,597✔
2251
      code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
26,597✔
2252
      if (code != TSDB_CODE_SUCCESS) {
26,597✔
2253
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
2254
        goto end;
×
2255
      }
2256
    }
2257
    RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
9,493✔
2258
    launchQueryImpl(pRequest, pQuery, true, NULL);
9,493✔
2259
    code = pRequest->code;
9,493✔
2260

2261
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
9,493✔
2262
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
2263
      qDestroyQuery(pQuery);
×
2264
      pQuery = NULL;
×
2265
      rspObj.resIter = -1;
×
2266
      continue;
×
2267
    }
2268
    break;
9,493✔
2269
  }
2270
  uDebug(LOG_ID_TAG " write raw data return, msg:%s", LOG_ID_VALUE, tstrerror(code));
9,493✔
2271

2272
end:
9,493✔
2273
  tDeleteMqDataRsp(&rspObj.dataRsp);
9,493✔
2274
  tDecoderClear(&decoder);
9,493✔
2275
  qDestroyQuery(pQuery);
9,493✔
2276
  destroyRequest(pRequest);
9,493✔
2277
  RAW_LOG_END
9,493✔
2278
  return code;
9,493✔
2279
}
2280

2281
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
1,347✔
2282
  if (taos == NULL || data == NULL) {
1,347✔
2283
    uError("invalid parameter in %s", __func__);
×
2284
    return TSDB_CODE_INVALID_PARA;
×
2285
  }
2286
  int32_t   code = TSDB_CODE_SUCCESS;
1,347✔
2287
  int32_t   lino = 0;
1,347✔
2288
  SQuery*   pQuery = NULL;
1,347✔
2289
  SMqRspObj rspObj = {0};
1,347✔
2290
  SDecoder  decoder = {0};
1,347✔
2291
  SHashObj* pCreateTbHash = NULL;
1,347✔
2292

2293
  SRequestObj*     pRequest = NULL;
1,347✔
2294
  SCatalog*        pCatalog = NULL;
1,347✔
2295
  SRequestConnInfo conn = {0};
1,347✔
2296

2297
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
1,347✔
2298
  uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
1,347✔
2299
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeSTaosxRsp, &rspObj));
1,347✔
2300

2301
  pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
1,347✔
2302
  RAW_NULL_CHECK(pCreateTbHash);
1,347✔
2303
  RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash));
1,347✔
2304

2305
  SHashObj* pVgHash = NULL;
1,347✔
2306
  SHashObj* pNameHash = NULL;
1,347✔
2307
  SHashObj* pMetaHash = NULL;
1,347✔
2308
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
1,347✔
2309
  int retry = 0;
1,347✔
2310
  while (1) {
2311
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
1,347✔
2312
    uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
1,347✔
2313
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
7,031✔
2314
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
5,684✔
2315
      RAW_NULL_CHECK(tbName);
5,684✔
2316
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
5,684✔
2317
      RAW_NULL_CHECK(pSW);
5,684✔
2318
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
5,684✔
2319
      RAW_NULL_CHECK(pRetrieve);
5,684✔
2320
      void* rawData = getRawDataFromRes(pRetrieve);
5,684✔
2321
      RAW_NULL_CHECK(rawData);
5,684✔
2322

2323
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
5,684✔
2324
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
5,684✔
2325
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
5,684✔
2326
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
5,684✔
2327

2328
      // find schema data info
2329
      SVCreateTbReq* pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, pName.tname, strlen(pName.tname));
5,684✔
2330
      STableMeta*    pTableMeta = NULL;
5,684✔
2331
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, pCreateReqDst, pCatalog, &conn, &pName,
5,684✔
2332
                                        &pTableMeta, pSW, rawData, retry));
2333
      char err[ERR_MSG_LEN] = {0};
5,684✔
2334
      code =
2335
          rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
5,684✔
2336
      if (code != TSDB_CODE_SUCCESS) {
5,684✔
2337
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
2338
        goto end;
×
2339
      }
2340
    }
2341
    RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
1,347✔
2342
    launchQueryImpl(pRequest, pQuery, true, NULL);
1,347✔
2343
    code = pRequest->code;
1,347✔
2344

2345
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
1,347✔
2346
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
2347
      qDestroyQuery(pQuery);
×
2348
      pQuery = NULL;
×
2349
      rspObj.resIter = -1;
×
2350
      continue;
×
2351
    }
2352
    break;
1,347✔
2353
  }
2354
  uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
1,347✔
2355

2356
end:
1,347✔
2357
  tDeleteSTaosxRsp(&rspObj.dataRsp);
1,347✔
2358
  void* pIter = taosHashIterate(pCreateTbHash, NULL);
1,347✔
2359
  while (pIter) {
4,595✔
2360
    tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE);
3,248✔
2361
    pIter = taosHashIterate(pCreateTbHash, pIter);
3,248✔
2362
  }
2363
  taosHashCleanup(pCreateTbHash);
1,347✔
2364
  tDecoderClear(&decoder);
1,347✔
2365
  qDestroyQuery(pQuery);
1,347✔
2366
  destroyRequest(pRequest);
1,347✔
2367
  RAW_LOG_END
1,347✔
2368
  return code;
1,347✔
2369
}
2370

2371
static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
×
2372
  if (taos == NULL || data == NULL) {
×
2373
    uError("invalid parameter in %s", __func__);
×
2374
    return TSDB_CODE_INVALID_PARA;
×
2375
  }
2376
  int32_t   code = TSDB_CODE_SUCCESS;
×
2377
  int32_t   lino = 0;
×
2378
  SQuery*   pQuery = NULL;
×
2379
  SHashObj* pVgroupHash = NULL;
×
2380
  SMqRspObj rspObj = {0};
×
2381
  SDecoder  decoder = {0};
×
2382

2383
  SRequestObj*     pRequest = NULL;
×
2384
  SCatalog*        pCatalog = NULL;
×
2385
  SRequestConnInfo conn = {0};
×
2386

2387
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
×
2388
  uDebug(LOG_ID_TAG " write raw rawdata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
×
2389
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
×
2390

2391
  SHashObj* pVgHash = NULL;
×
2392
  SHashObj* pNameHash = NULL;
×
2393
  SHashObj* pMetaHash = NULL;
×
2394
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
×
2395
  int retry = 0;
×
2396
  while (1) {
×
2397
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
×
2398
    uDebug(LOG_ID_TAG " write raw rawdata block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
×
2399
    SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(pQuery)->pRoot;
×
2400
    pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
×
2401
    RAW_NULL_CHECK(pVgroupHash);
×
2402
    pStmt->pVgDataBlocks = taosArrayInit(8, POINTER_BYTES);
×
2403
    RAW_NULL_CHECK(pStmt->pVgDataBlocks);
×
2404

2405
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
×
2406
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
×
2407
      RAW_NULL_CHECK(tbName);
×
2408
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
×
2409
      RAW_NULL_CHECK(pRetrieve);
×
2410
      void* rawData = getRawDataFromRes(pRetrieve);
×
2411
      RAW_NULL_CHECK(rawData);
×
2412

2413
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
×
2414
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
×
2415
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
×
2416
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
×
2417

2418
      // find schema data info
2419
      STableMeta* pTableMeta = NULL;
×
2420
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, &pTableMeta, NULL,
×
2421
                                        NULL, retry));
2422
      char err[ERR_MSG_LEN] = {0};
×
2423
      code = rawBlockBindRawData(pVgroupHash, pStmt->pVgDataBlocks, pTableMeta, rawData);
×
2424
      if (code != TSDB_CODE_SUCCESS) {
×
2425
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
2426
        goto end;
×
2427
      }
2428
    }
2429
    taosHashCleanup(pVgroupHash);
×
2430
    pVgroupHash = NULL;
×
2431

2432
    RAW_RETURN_CHECK(smlBuildOutputRaw(pQuery, pVgHash));
×
2433
    launchQueryImpl(pRequest, pQuery, true, NULL);
×
2434
    code = pRequest->code;
×
2435

2436
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
×
2437
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
2438
      qDestroyQuery(pQuery);
×
2439
      pQuery = NULL;
×
2440
      rspObj.resIter = -1;
×
2441
      continue;
×
2442
    }
2443
    break;
×
2444
  }
2445
  uDebug(LOG_ID_TAG " write raw rawdata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
×
2446

2447
end:
×
2448
  tDeleteMqDataRsp(&rspObj.dataRsp);
×
2449
  tDecoderClear(&decoder);
×
2450
  qDestroyQuery(pQuery);
×
2451
  taosHashCleanup(pVgroupHash);
×
2452
  destroyRequest(pRequest);
×
2453
  RAW_LOG_END
×
2454
  return code;
×
2455
}
2456

2457
static int32_t processSimpleMeta(SMqMetaRsp* pMetaRsp, cJSON** meta) {
80,502✔
2458
  if (pMetaRsp == NULL || meta == NULL) {
80,502✔
2459
    uError("invalid parameter in %s", __func__);
×
2460
    return TSDB_CODE_INVALID_PARA;
×
2461
  }
2462
  int32_t code = 0;
80,502✔
2463
  int32_t lino = 0;
80,502✔
2464
  RAW_LOG_START
80,502✔
2465
  if (pMetaRsp->resMsgType == TDMT_VND_CREATE_STB) {
80,502✔
2466
    RAW_RETURN_CHECK(processCreateStb(pMetaRsp, meta));
21,165✔
2467
  } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_STB) {
59,337✔
2468
    RAW_RETURN_CHECK(processAlterStb(pMetaRsp, meta));
12,503✔
2469
  } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_STB) {
46,834✔
2470
    RAW_RETURN_CHECK(processDropSTable(pMetaRsp, meta));
2,160✔
2471
  } else if (pMetaRsp->resMsgType == TDMT_VND_CREATE_TABLE) {
44,674✔
2472
    RAW_RETURN_CHECK(processCreateTable(pMetaRsp, meta));
35,650✔
2473
  } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_TABLE) {
9,024✔
2474
    RAW_RETURN_CHECK(processAlterTable(pMetaRsp, meta));
6,853✔
2475
  } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_TABLE) {
2,171✔
2476
    RAW_RETURN_CHECK(processDropTable(pMetaRsp, meta));
1,359✔
2477
  } else if (pMetaRsp->resMsgType == TDMT_VND_DELETE) {
812✔
2478
    RAW_RETURN_CHECK(processDeleteTable(pMetaRsp, meta));
812✔
2479
  }
2480

2481
end:
812✔
2482
  RAW_LOG_END
80,502✔
2483
  return code;
80,502✔
2484
}
2485

2486
static int32_t processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp, char** string) {
4,424✔
2487
  if (pMsgRsp == NULL || string == NULL) {
4,424✔
2488
    uError("invalid parameter in %s", __func__);
×
2489
    return TSDB_CODE_INVALID_PARA;
×
2490
  }
2491
  SDecoder        coder = {0};
4,424✔
2492
  SMqBatchMetaRsp rsp = {0};
4,424✔
2493
  int32_t         code = 0;
4,424✔
2494
  int32_t         lino = 0;
4,424✔
2495
  cJSON*          pJson = NULL;
4,424✔
2496
  tDecoderInit(&coder, pMsgRsp->pMetaBuff, pMsgRsp->metaBuffLen);
4,424✔
2497
  RAW_RETURN_CHECK(tDecodeMqBatchMetaRsp(&coder, &rsp));
4,424✔
2498

2499
  pJson = cJSON_CreateObject();
4,424✔
2500
  RAW_NULL_CHECK(pJson);
4,424✔
2501
  RAW_FALSE_CHECK(cJSON_AddStringToObject(pJson, "tmq_meta_version", TMQ_META_VERSION));
4,424✔
2502
  cJSON* pMetaArr = cJSON_CreateArray();
4,424✔
2503
  RAW_NULL_CHECK(pMetaArr);
4,424✔
2504
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(pJson, "metas", pMetaArr));
4,424✔
2505

2506
  int32_t num = taosArrayGetSize(rsp.batchMetaReq);
4,424✔
2507
  for (int32_t i = 0; i < num; i++) {
38,957✔
2508
    int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
34,533✔
2509
    RAW_NULL_CHECK(len);
34,533✔
2510
    void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
34,533✔
2511
    RAW_NULL_CHECK(tmpBuf);
34,533✔
2512
    SDecoder   metaCoder = {0};
34,533✔
2513
    SMqMetaRsp metaRsp = {0};
34,533✔
2514
    tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead));
34,533✔
2515
    RAW_RETURN_CHECK(tDecodeMqMetaRsp(&metaCoder, &metaRsp));
34,533✔
2516
    cJSON* pItem = NULL;
34,533✔
2517
    RAW_RETURN_CHECK(processSimpleMeta(&metaRsp, &pItem));
34,533✔
2518
    tDeleteMqMetaRsp(&metaRsp);
34,533✔
2519
    if (pItem != NULL) RAW_FALSE_CHECK(tmqAddJsonArrayItem(pMetaArr, pItem));
34,533✔
2520
  }
2521

2522
  char* fullStr = cJSON_PrintUnformatted(pJson);
4,424✔
2523
  *string = fullStr;
4,424✔
2524

2525
end:
4,424✔
2526
  cJSON_Delete(pJson);
4,424✔
2527
  tDeleteMqBatchMetaRsp(&rsp);
4,424✔
2528
  RAW_LOG_END
4,424✔
2529
  return code;
4,424✔
2530
}
2531

2532
char* tmq_get_json_meta(TAOS_RES* res) {
52,370✔
2533
  int32_t code = TSDB_CODE_SUCCESS;
52,370✔
2534
  int32_t lino = 0;
52,370✔
2535
  char*   string = NULL;
52,370✔
2536
  RAW_LOG_START
52,370✔
2537
  RAW_NULL_CHECK(res);
52,370✔
2538
  RAW_FALSE_CHECK(TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) || TD_RES_TMQ_BATCH_META(res));
52,370✔
2539

2540
  SMqRspObj* rspObj = (SMqRspObj*)res;
52,370✔
2541
  if (TD_RES_TMQ_METADATA(res)) {
52,370✔
2542
    RAW_RETURN_CHECK(processAutoCreateTable(&rspObj->dataRsp, &string));
1,977✔
2543
  } else if (TD_RES_TMQ_BATCH_META(res)) {
50,393✔
2544
    RAW_RETURN_CHECK(processBatchMetaToJson(&rspObj->batchMetaRsp, &string));
4,424✔
2545
  } else if (TD_RES_TMQ_META(res)) {
45,969✔
2546
    cJSON* pJson = NULL;
45,969✔
2547
    RAW_RETURN_CHECK(processSimpleMeta(&rspObj->metaRsp, &pJson));
45,969✔
2548
    string = cJSON_PrintUnformatted(pJson);
45,969✔
2549
    cJSON_Delete(pJson);
45,969✔
2550
  } else {
2551
    uError("tmq_get_json_meta res:%d, invalid type", *(int8_t*)res);
×
2552
  }
2553

2554
  uDebug("tmq_get_json_meta string:%s", string);
52,370✔
2555

2556
end:
52,370✔
2557
  RAW_LOG_END
52,370✔
2558
  return string;
52,370✔
2559
}
2560

2561
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
63,409✔
2562

2563
static int32_t getOffSetLen(const SMqDataRsp* pRsp) {
10,882✔
2564
  if (pRsp == NULL) {
10,882✔
2565
    uError("invalid parameter in %s", __func__);
×
2566
    return TSDB_CODE_INVALID_PARA;
×
2567
  }
2568
  int32_t pos = 0;
10,882✔
2569
  int32_t code = 0;
10,882✔
2570
  int32_t lino = 0;
10,882✔
2571
  RAW_LOG_START
10,882✔
2572
  SEncoder coder = {0};
10,882✔
2573
  tEncoderInit(&coder, NULL, 0);
10,882✔
2574
  RAW_RETURN_CHECK(tEncodeSTqOffsetVal(&coder, &pRsp->reqOffset));
10,882✔
2575
  RAW_RETURN_CHECK(tEncodeSTqOffsetVal(&coder, &pRsp->rspOffset));
10,882✔
2576
  pos = coder.pos;
10,882✔
2577
  tEncoderClear(&coder);
10,882✔
2578

2579
end:
10,882✔
2580
  if (code != 0) {
10,882✔
2581
    uError("getOffSetLen failed, code:%d", code);
×
2582
    return code;
×
2583
  } else {
2584
    uDebug("getOffSetLen success, len:%d", pos);
10,882✔
2585
    return pos;
10,882✔
2586
  }
2587
}
2588

2589
typedef int32_t __encode_func__(SEncoder* pEncoder, const SMqDataRsp* pRsp);
2590
static int32_t  encodeMqDataRsp(__encode_func__* encodeFunc, SMqDataRsp* rspObj, tmq_raw_data* raw) {
10,882✔
2591
  if (raw == NULL || encodeFunc == NULL || rspObj == NULL) {
10,882✔
2592
    uError("invalid parameter in %s", __func__);
×
2593
    return TSDB_CODE_INVALID_PARA;
×
2594
  }
2595
  uint32_t len = 0;
10,882✔
2596
  int32_t  code = 0;
10,882✔
2597
  int32_t  lino = 0;
10,882✔
2598
  SEncoder encoder = {0};
10,882✔
2599
  void*    buf = NULL;
10,882✔
2600
  tEncodeSize(encodeFunc, rspObj, len, code);
10,882✔
2601
  RAW_FALSE_CHECK(code >= 0);
10,882✔
2602
  len += sizeof(int8_t) + sizeof(int32_t);
10,882✔
2603
  buf = taosMemoryCalloc(1, len);
10,882✔
2604
  RAW_NULL_CHECK(buf);
10,882✔
2605
  tEncoderInit(&encoder, buf, len);
10,882✔
2606
  RAW_RETURN_CHECK(tEncodeI8(&encoder, MQ_DATA_RSP_VERSION));
10,882✔
2607
  int32_t offsetLen = getOffSetLen(rspObj);
10,882✔
2608
  RAW_FALSE_CHECK(offsetLen > 0);
10,882✔
2609
  RAW_RETURN_CHECK(tEncodeI32(&encoder, offsetLen));
10,882✔
2610
  RAW_RETURN_CHECK(encodeFunc(&encoder, rspObj));
10,882✔
2611

2612
  raw->raw = buf;
10,882✔
2613
  buf = NULL;
10,882✔
2614
  raw->raw_len = len;
10,882✔
2615

2616
end:
10,882✔
2617
  RAW_LOG_END
10,882✔
2618
  return code;
10,882✔
2619
}
2620

2621
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
62,940✔
2622
  if (raw == NULL || res == NULL) {
62,940✔
2623
    uError("invalid parameter in %s", __func__);
×
2624
    return TSDB_CODE_INVALID_PARA;
×
2625
  }
2626
  int32_t code = TSDB_CODE_SUCCESS;
62,940✔
2627
  int32_t lino = 0;
62,940✔
2628
  RAW_LOG_START
62,940✔
2629
  *raw = (tmq_raw_data){0};
62,940✔
2630
  SMqRspObj* rspObj = ((SMqRspObj*)res);
62,940✔
2631
  if (TD_RES_TMQ_META(res)) {
62,940✔
2632
    raw->raw = rspObj->metaRsp.metaRsp;
48,483✔
2633
    raw->raw_len = rspObj->metaRsp.metaRspLen >= 0 ? rspObj->metaRsp.metaRspLen : 0;
48,483✔
2634
    raw->raw_type = rspObj->metaRsp.resMsgType;
48,483✔
2635
    uDebug("tmq get raw type meta:%p", raw);
48,483✔
2636
  } else if (TD_RES_TMQ(res)) {
14,457✔
2637
    RAW_RETURN_CHECK(encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->dataRsp, raw));
9,535✔
2638
    raw->raw_type = RES_TYPE__TMQ;
9,535✔
2639
    uDebug("tmq get raw type data:%p", raw);
9,535✔
2640
  } else if (TD_RES_TMQ_METADATA(res)) {
4,922✔
2641
    RAW_RETURN_CHECK(encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->dataRsp, raw));
1,347✔
2642
    raw->raw_type = RES_TYPE__TMQ_METADATA;
1,347✔
2643
    uDebug("tmq get raw type metadata:%p", raw);
1,347✔
2644
  } else if (TD_RES_TMQ_BATCH_META(res)) {
3,575✔
2645
    raw->raw = rspObj->batchMetaRsp.pMetaBuff;
3,575✔
2646
    raw->raw_len = rspObj->batchMetaRsp.metaBuffLen;
3,575✔
2647
    raw->raw_type = rspObj->resType;
3,575✔
2648
    uDebug("tmq get raw batch meta:%p", raw);
3,575✔
2649
  } else if (TD_RES_TMQ_RAW(res)) {
×
2650
    raw->raw = rspObj->dataRsp.rawData;
×
2651
    rspObj->dataRsp.rawData = NULL;
×
2652
    raw->raw_len = rspObj->dataRsp.len;
×
2653
    raw->raw_type = rspObj->resType;
×
2654
    uDebug("tmq get raw raw:%p", raw);
×
2655
  } else {
2656
    uError("tmq get raw error type:%d", *(int8_t*)res);
×
2657
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
2658
  }
2659

2660
end:
62,940✔
2661
  RAW_LOG_END
62,940✔
2662
  return code;
62,940✔
2663
}
2664

2665
void tmq_free_raw(tmq_raw_data raw) {
62,898✔
2666
  uDebug("tmq free raw data type:%d", raw.raw_type);
62,898✔
2667
  if (raw.raw_type == RES_TYPE__TMQ || raw.raw_type == RES_TYPE__TMQ_METADATA) {
62,898✔
2668
    taosMemoryFree(raw.raw);
10,840✔
2669
  } else if (raw.raw_type == RES_TYPE__TMQ_RAWDATA && raw.raw != NULL) {
52,058✔
2670
    taosMemoryFree(POINTER_SHIFT(raw.raw, -sizeof(SMqRspHead)));
×
2671
  }
2672
  (void)memset(terrMsg, 0, ERR_MSG_LEN);
62,898✔
2673
}
62,898✔
2674

2675
static int32_t writeRawInit() {
89,859✔
2676
  while (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_START) {
92,867✔
2677
    int8_t old = atomic_val_compare_exchange_8(&initFlag, 0, 1);
3,008✔
2678
    if (old == 0) {
3,008✔
2679
      int32_t code = initRawCacheHash();
3,008✔
2680
      if (code != 0) {
3,008✔
2681
        uError("tmq writeRawImpl init error:%d", code);
×
2682
        atomic_store_8(&initedFlag, WRITE_RAW_INIT_FAIL);
×
2683
        return code;
×
2684
      }
2685
      atomic_store_8(&initedFlag, WRITE_RAW_INIT_OK);
3,008✔
2686
    }
2687
  }
2688

2689
  if (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_FAIL) {
89,859✔
2690
    return TSDB_CODE_INTERNAL_ERROR;
×
2691
  }
2692
  return 0;
89,859✔
2693
}
2694

2695
static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) {
89,859✔
2696
  if (taos == NULL || buf == NULL) {
89,859✔
2697
    uError("invalid parameter in %s", __func__);
×
2698
    return TSDB_CODE_INVALID_PARA;
×
2699
  }
2700
  if (writeRawInit() != 0) {
89,859✔
2701
    return TSDB_CODE_INTERNAL_ERROR;
×
2702
  }
2703

2704
  if (type == TDMT_VND_CREATE_STB) {
89,859✔
2705
    return taosCreateStb(taos, buf, len);
19,922✔
2706
  } else if (type == TDMT_VND_ALTER_STB) {
69,937✔
2707
    return taosCreateStb(taos, buf, len);
10,805✔
2708
  } else if (type == TDMT_VND_DROP_STB) {
59,132✔
2709
    return taosDropStb(taos, buf, len);
2,160✔
2710
  } else if (type == TDMT_VND_CREATE_TABLE) {
56,972✔
2711
    return taosCreateTable(taos, buf, len);
33,840✔
2712
  } else if (type == TDMT_VND_ALTER_TABLE) {
23,132✔
2713
    return taosAlterTable(taos, buf, len);
6,546✔
2714
  } else if (type == TDMT_VND_DROP_TABLE) {
16,586✔
2715
    return taosDropTable(taos, buf, len);
1,359✔
2716
  } else if (type == TDMT_VND_DELETE) {
15,227✔
2717
    return taosDeleteData(taos, buf, len);
812✔
2718
  } else if (type == RES_TYPE__TMQ_METADATA) {
14,415✔
2719
    return tmqWriteRawMetaDataImpl(taos, buf, len);
1,347✔
2720
  } else if (type == RES_TYPE__TMQ_RAWDATA) {
13,068✔
2721
    return tmqWriteRawRawDataImpl(taos, buf, len);
×
2722
  } else if (type == RES_TYPE__TMQ) {
13,068✔
2723
    return tmqWriteRawDataImpl(taos, buf, len);
9,493✔
2724
  } else if (type == RES_TYPE__TMQ_BATCH_META) {
3,575✔
2725
    return tmqWriteBatchMetaDataImpl(taos, buf, len);
3,575✔
2726
  }
2727
  return TSDB_CODE_INVALID_PARA;
×
2728
}
2729

2730
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
59,664✔
2731
  if (taos == NULL || raw.raw == NULL || raw.raw_len <= 0) {
59,664✔
2732
    SET_ERROR_MSG("taos:%p or data:%p is NULL or raw_len <= 0", taos, raw.raw);
3,489✔
2733
    return TSDB_CODE_INVALID_PARA;
3,489✔
2734
  }
2735
  taosClearErrMsg();  // clear global error message
56,175✔
2736

2737
  return writeRawImpl(taos, raw.raw, raw.raw_len, raw.raw_type);
56,175✔
2738
}
2739

2740
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen) {
3,575✔
2741
  if (taos == NULL || meta == NULL) {
3,575✔
2742
    uError("invalid parameter in %s", __func__);
×
2743
    return TSDB_CODE_INVALID_PARA;
×
2744
  }
2745
  SMqBatchMetaRsp rsp = {0};
3,575✔
2746
  SDecoder        coder = {0};
3,575✔
2747
  int32_t         code = TSDB_CODE_SUCCESS;
3,575✔
2748
  int32_t         lino = 0;
3,575✔
2749

2750
  RAW_LOG_START
3,575✔
2751
  // decode and process req
2752
  tDecoderInit(&coder, meta, metaLen);
3,575✔
2753
  RAW_RETURN_CHECK(tDecodeMqBatchMetaRsp(&coder, &rsp));
3,575✔
2754
  int32_t num = taosArrayGetSize(rsp.batchMetaReq);
3,575✔
2755
  for (int32_t i = 0; i < num; i++) {
37,259✔
2756
    int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
33,684✔
2757
    RAW_NULL_CHECK(len);
33,684✔
2758
    void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
33,684✔
2759
    RAW_NULL_CHECK(tmpBuf);
33,684✔
2760
    SDecoder   metaCoder = {0};
33,684✔
2761
    SMqMetaRsp metaRsp = {0};
33,684✔
2762
    tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead));
33,684✔
2763
    RAW_RETURN_CHECK(tDecodeMqMetaRsp(&metaCoder, &metaRsp));
33,684✔
2764
    code = writeRawImpl(taos, metaRsp.metaRsp, metaRsp.metaRspLen, metaRsp.resMsgType);
33,684✔
2765
    tDeleteMqMetaRsp(&metaRsp);
33,684✔
2766
    if (code != TSDB_CODE_SUCCESS) {
33,684✔
2767
      goto end;
×
2768
    }
2769
  }
2770

2771
end:
3,575✔
2772
  tDeleteMqBatchMetaRsp(&rsp);
3,575✔
2773
  RAW_LOG_END
3,575✔
2774
  return code;
3,575✔
2775
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc