• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4548

22 Jul 2025 02:37AM UTC coverage: 54.273% (-3.0%) from 57.287%
#4548

push

travis-ci

GitHub
Merge pull request #32061 from taosdata/new_testcases

132738 of 315239 branches covered (42.11%)

Branch coverage included in aggregate %.

201371 of 300373 relevant lines covered (67.04%)

3475977.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/client/src/clientRawBlockWrite.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "parser.h"
19
#include "tcol.h"
20
#include "tcompression.h"
21
#include "tdatablock.h"
22
#include "tdef.h"
23
#include "tglobal.h"
24
#include "tmsgtype.h"
25

26
#define RAW_NULL_CHECK(c) \
27
  do {                    \
28
    if (c == NULL) {      \
29
      code = terrno;      \
30
      goto end;           \
31
    }                     \
32
  } while (0)
33

34
#define RAW_FALSE_CHECK(c)           \
35
  do {                               \
36
    if (!c) {                        \
37
      code = TSDB_CODE_INVALID_PARA; \
38
      goto end;                      \
39
    }                                \
40
  } while (0)
41

42
#define RAW_RETURN_CHECK(c) \
43
  do {                      \
44
    code = c;               \
45
    if (code != 0) {        \
46
      goto end;             \
47
    }                       \
48
  } while (0)
49

50
#define LOG_ID_TAG   "connId:0x%" PRIx64 ", QID:0x%" PRIx64
51
#define LOG_ID_VALUE *(int64_t*)taos, pRequest->requestId
52

53
#define TMQ_META_VERSION "1.0"
54

55
static bool tmqAddJsonObjectItem(cJSON* object, const char* string, cJSON* item) {
×
56
  bool ret = cJSON_AddItemToObject(object, string, item);
×
57
  if (!ret) {
×
58
    cJSON_Delete(item);
×
59
  }
60
  return ret;
×
61
}
62
static bool tmqAddJsonArrayItem(cJSON* array, cJSON* item) {
×
63
  bool ret = cJSON_AddItemToArray(array, item);
×
64
  if (!ret) {
×
65
    cJSON_Delete(item);
×
66
  }
67
  return ret;
×
68
}
69

70
static int32_t  tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen);
71
static tb_uid_t processSuid(tb_uid_t suid, char* db) {
×
72
  if (db == NULL) {
×
73
    return suid;
×
74
  }
75
  return suid + MurmurHash3_32(db, strlen(db));
×
76
}
77
static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t,
×
78
                                 SColCmprWrapper* pColCmprRow, cJSON** pJson) {
79
  if (schemaRow == NULL || name == NULL || pColCmprRow == NULL || pJson == NULL) {
×
80
    uError("invalid parameter, schemaRow:%p, name:%p, pColCmprRow:%p, pJson:%p", schemaRow, name, pColCmprRow, pJson);
×
81
    return;
×
82
  }
83
  int32_t code = TSDB_CODE_SUCCESS;
×
84
  int8_t  buildDefaultCompress = 0;
×
85
  if (pColCmprRow->nCols <= 0) {
×
86
    buildDefaultCompress = 1;
×
87
  }
88

89
  char*  string = NULL;
×
90
  cJSON* json = cJSON_CreateObject();
×
91
  RAW_NULL_CHECK(json);
×
92
  cJSON* type = cJSON_CreateString("create");
×
93
  RAW_NULL_CHECK(type);
×
94

95
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
×
96
  cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super");
×
97
  RAW_NULL_CHECK(tableType);
×
98
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
×
99
  cJSON* tableName = cJSON_CreateString(name);
×
100
  RAW_NULL_CHECK(tableName);
×
101
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
×
102

103
  cJSON* columns = cJSON_CreateArray();
×
104
  RAW_NULL_CHECK(columns);
×
105
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "columns", columns));
×
106

107
  for (int i = 0; i < schemaRow->nCols; i++) {
×
108
    cJSON* column = cJSON_CreateObject();
×
109
    RAW_NULL_CHECK(column);
×
110
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(columns, column));
×
111
    SSchema* s = schemaRow->pSchema + i;
×
112
    cJSON*   cname = cJSON_CreateString(s->name);
×
113
    RAW_NULL_CHECK(cname);
×
114
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "name", cname));
×
115
    cJSON* ctype = cJSON_CreateNumber(s->type);
×
116
    RAW_NULL_CHECK(ctype);
×
117
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "type", ctype));
×
118
    if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) {
×
119
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
×
120
      cJSON*  cbytes = cJSON_CreateNumber(length);
×
121
      RAW_NULL_CHECK(cbytes);
×
122
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "length", cbytes));
×
123
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
×
124
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
125
      cJSON*  cbytes = cJSON_CreateNumber(length);
×
126
      RAW_NULL_CHECK(cbytes);
×
127
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "length", cbytes));
×
128
    }
129
    cJSON* isPk = cJSON_CreateBool(s->flags & COL_IS_KEY);
×
130
    RAW_NULL_CHECK(isPk);
×
131
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "isPrimarykey", isPk));
×
132

133
    if (pColCmprRow == NULL) {
×
134
      continue;
×
135
    }
136

137
    uint32_t alg = 0;
×
138
    if (buildDefaultCompress) {
×
139
      alg = createDefaultColCmprByType(s->type);
×
140
    } else {
141
      SColCmpr* pColCmpr = pColCmprRow->pColCmpr + i;
×
142
      alg = pColCmpr->alg;
×
143
    }
144
    const char* encode = columnEncodeStr(COMPRESS_L1_TYPE_U32(alg));
×
145
    RAW_NULL_CHECK(encode);
×
146
    const char* compress = columnCompressStr(COMPRESS_L2_TYPE_U32(alg));
×
147
    RAW_NULL_CHECK(compress);
×
148
    const char* level = columnLevelStr(COMPRESS_L2_TYPE_LEVEL_U32(alg));
×
149
    RAW_NULL_CHECK(level);
×
150

151
    cJSON* encodeJson = cJSON_CreateString(encode);
×
152
    RAW_NULL_CHECK(encodeJson);
×
153
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "encode", encodeJson));
×
154

155
    cJSON* compressJson = cJSON_CreateString(compress);
×
156
    RAW_NULL_CHECK(compressJson);
×
157
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "compress", compressJson));
×
158

159
    cJSON* levelJson = cJSON_CreateString(level);
×
160
    RAW_NULL_CHECK(levelJson);
×
161
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "level", levelJson));
×
162
  }
163

164
  cJSON* tags = cJSON_CreateArray();
×
165
  RAW_NULL_CHECK(tags);
×
166
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags));
×
167

168
  for (int i = 0; schemaTag && i < schemaTag->nCols; i++) {
×
169
    cJSON* tag = cJSON_CreateObject();
×
170
    RAW_NULL_CHECK(tag);
×
171
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag));
×
172
    SSchema* s = schemaTag->pSchema + i;
×
173
    cJSON*   tname = cJSON_CreateString(s->name);
×
174
    RAW_NULL_CHECK(tname);
×
175
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname));
×
176
    cJSON* ttype = cJSON_CreateNumber(s->type);
×
177
    RAW_NULL_CHECK(ttype);
×
178
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype));
×
179
    if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) {
×
180
      int32_t length = s->bytes - VARSTR_HEADER_SIZE;
×
181
      cJSON*  cbytes = cJSON_CreateNumber(length);
×
182
      RAW_NULL_CHECK(cbytes);
×
183
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "length", cbytes));
×
184
    } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
×
185
      int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
186
      cJSON*  cbytes = cJSON_CreateNumber(length);
×
187
      RAW_NULL_CHECK(cbytes);
×
188
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "length", cbytes));
×
189
    }
190
  }
191

192
end:
×
193
  *pJson = json;
×
194
}
195

196
static int32_t setCompressOption(cJSON* json, uint32_t para) {
×
197
  if (json == NULL) {
×
198
    return TSDB_CODE_INVALID_PARA;
×
199
  }
200
  uint8_t encode = COMPRESS_L1_TYPE_U32(para);
×
201
  int32_t code = 0;
×
202
  if (encode != 0) {
×
203
    const char* encodeStr = columnEncodeStr(encode);
×
204
    RAW_NULL_CHECK(encodeStr);
×
205
    cJSON* encodeJson = cJSON_CreateString(encodeStr);
×
206
    RAW_NULL_CHECK(encodeJson);
×
207
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "encode", encodeJson));
×
208
    return code;
×
209
  }
210
  uint8_t compress = COMPRESS_L2_TYPE_U32(para);
×
211
  if (compress != 0) {
×
212
    const char* compressStr = columnCompressStr(compress);
×
213
    RAW_NULL_CHECK(compressStr);
×
214
    cJSON* compressJson = cJSON_CreateString(compressStr);
×
215
    RAW_NULL_CHECK(compressJson);
×
216
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "compress", compressJson));
×
217
    return code;
×
218
  }
219
  uint8_t level = COMPRESS_L2_TYPE_LEVEL_U32(para);
×
220
  if (level != 0) {
×
221
    const char* levelStr = columnLevelStr(level);
×
222
    RAW_NULL_CHECK(levelStr);
×
223
    cJSON* levelJson = cJSON_CreateString(levelStr);
×
224
    RAW_NULL_CHECK(levelJson);
×
225
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "level", levelJson));
×
226
    return code;
×
227
  }
228

229
end:
×
230
  return code;
×
231
}
232
static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON** pJson) {
×
233
  if (alterData == NULL || pJson == NULL) {
×
234
    uError("invalid parameter in %s", __func__);
×
235
    return;
×
236
  }
237
  SMAlterStbReq req = {0};
×
238
  cJSON*        json = NULL;
×
239
  char*         string = NULL;
×
240
  int32_t       code = 0;
×
241

242
  if (tDeserializeSMAlterStbReq(alterData, alterDataLen, &req) != 0) {
×
243
    goto end;
×
244
  }
245

246
  json = cJSON_CreateObject();
×
247
  RAW_NULL_CHECK(json);
×
248
  cJSON* type = cJSON_CreateString("alter");
×
249
  RAW_NULL_CHECK(type);
×
250
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
×
251
  SName name = {0};
×
252
  RAW_RETURN_CHECK(tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
×
253
  cJSON* tableType = cJSON_CreateString("super");
×
254
  RAW_NULL_CHECK(tableType);
×
255
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
×
256
  cJSON* tableName = cJSON_CreateString(name.tname);
×
257
  RAW_NULL_CHECK(tableName);
×
258
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
×
259

260
  cJSON* alterType = cJSON_CreateNumber(req.alterType);
×
261
  RAW_NULL_CHECK(alterType);
×
262
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "alterType", alterType));
×
263
  switch (req.alterType) {
×
264
    case TSDB_ALTER_TABLE_ADD_TAG:
×
265
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
266
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
×
267
      RAW_NULL_CHECK(field);
×
268
      cJSON* colName = cJSON_CreateString(field->name);
×
269
      RAW_NULL_CHECK(colName);
×
270
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
271
      cJSON* colType = cJSON_CreateNumber(field->type);
×
272
      RAW_NULL_CHECK(colType);
×
273
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
×
274

275
      if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
×
276
          field->type == TSDB_DATA_TYPE_GEOMETRY) {
×
277
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
×
278
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
279
        RAW_NULL_CHECK(cbytes);
×
280
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
281
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
×
282
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
283
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
284
        RAW_NULL_CHECK(cbytes);
×
285
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
286
      }
287
      break;
×
288
    }
289
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: {
×
290
      SFieldWithOptions* field = taosArrayGet(req.pFields, 0);
×
291
      RAW_NULL_CHECK(field);
×
292
      cJSON* colName = cJSON_CreateString(field->name);
×
293
      RAW_NULL_CHECK(colName);
×
294
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
295
      cJSON* colType = cJSON_CreateNumber(field->type);
×
296
      RAW_NULL_CHECK(colType);
×
297
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
×
298

299
      if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
×
300
          field->type == TSDB_DATA_TYPE_GEOMETRY) {
×
301
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
×
302
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
303
        RAW_NULL_CHECK(cbytes);
×
304
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
305
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
×
306
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
307
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
308
        RAW_NULL_CHECK(cbytes);
×
309
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
310
      }
311
      RAW_RETURN_CHECK(setCompressOption(json, field->compress));
×
312
      break;
×
313
    }
314
    case TSDB_ALTER_TABLE_DROP_TAG:
×
315
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
316
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
×
317
      RAW_NULL_CHECK(field);
×
318
      cJSON* colName = cJSON_CreateString(field->name);
×
319
      RAW_NULL_CHECK(colName);
×
320
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
321
      break;
×
322
    }
323
    case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
×
324
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
325
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
×
326
      RAW_NULL_CHECK(field);
×
327
      cJSON* colName = cJSON_CreateString(field->name);
×
328
      RAW_NULL_CHECK(colName);
×
329
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
330
      cJSON* colType = cJSON_CreateNumber(field->type);
×
331
      RAW_NULL_CHECK(colType);
×
332
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
×
333
      if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
×
334
          field->type == TSDB_DATA_TYPE_GEOMETRY) {
×
335
        int32_t length = field->bytes - VARSTR_HEADER_SIZE;
×
336
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
337
        RAW_NULL_CHECK(cbytes);
×
338
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
339
      } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
×
340
        int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
341
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
342
        RAW_NULL_CHECK(cbytes);
×
343
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
344
      }
345
      break;
×
346
    }
347
    case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
×
348
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
349
      TAOS_FIELD* oldField = taosArrayGet(req.pFields, 0);
×
350
      RAW_NULL_CHECK(oldField);
×
351
      TAOS_FIELD* newField = taosArrayGet(req.pFields, 1);
×
352
      RAW_NULL_CHECK(newField);
×
353
      cJSON* colName = cJSON_CreateString(oldField->name);
×
354
      RAW_NULL_CHECK(colName);
×
355
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
356
      cJSON* colNewName = cJSON_CreateString(newField->name);
×
357
      RAW_NULL_CHECK(colNewName);
×
358
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colNewName", colNewName));
×
359
      break;
×
360
    }
361
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
×
362
      TAOS_FIELD* field = taosArrayGet(req.pFields, 0);
×
363
      RAW_NULL_CHECK(field);
×
364
      cJSON* colName = cJSON_CreateString(field->name);
×
365
      RAW_NULL_CHECK(colName);
×
366
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
367
      RAW_RETURN_CHECK(setCompressOption(json, field->bytes));
×
368
      break;
×
369
    }
370
    default:
×
371
      break;
×
372
  }
373

374
end:
×
375
  tFreeSMAltertbReq(&req);
×
376
  *pJson = json;
×
377
}
378

379
static void processCreateStb(SMqMetaRsp* metaRsp, cJSON** pJson) {
×
380
  if (metaRsp == NULL || pJson == NULL) {
×
381
    uError("invalid parameter in %s", __func__);
×
382
    return;
×
383
  }
384
  SVCreateStbReq req = {0};
×
385
  SDecoder       coder = {0};
×
386

387
  uDebug("create stable data:%p", metaRsp);
×
388
  // decode and process req
389
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
×
390
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
×
391
  tDecoderInit(&coder, data, len);
×
392

393
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
×
394
    goto end;
×
395
  }
396
  buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE, &req.colCmpr, pJson);
×
397

398
end:
×
399
  uDebug("create stable return");
×
400
  tDecoderClear(&coder);
×
401
}
402

403
static void processAlterStb(SMqMetaRsp* metaRsp, cJSON** pJson) {
×
404
  if (metaRsp == NULL || pJson == NULL) {
×
405
    uError("invalid parameter in %s", __func__);
×
406
    return;
×
407
  }
408
  SVCreateStbReq req = {0};
×
409
  SDecoder       coder = {0};
×
410
  uDebug("alter stable data:%p", metaRsp);
×
411

412
  // decode and process req
413
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
×
414
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
×
415
  tDecoderInit(&coder, data, len);
×
416

417
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
×
418
    goto end;
×
419
  }
420
  buildAlterSTableJson(req.alterOriData, req.alterOriDataLen, pJson);
×
421

422
end:
×
423
  uDebug("alter stable return");
×
424
  tDecoderClear(&coder);
×
425
}
426

427
static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
×
428
  if (json == NULL || pCreateReq == NULL) {
×
429
    uError("invalid parameter in %s", __func__);
×
430
    return;
×
431
  }
432
  STag*   pTag = (STag*)pCreateReq->ctb.pTag;
×
433
  char*   sname = pCreateReq->ctb.stbName;
×
434
  char*   name = pCreateReq->name;
×
435
  SArray* tagName = pCreateReq->ctb.tagName;
×
436
  int64_t id = pCreateReq->uid;
×
437
  uint8_t tagNum = pCreateReq->ctb.tagNum;
×
438
  int32_t code = 0;
×
439
  SArray* pTagVals = NULL;
×
440
  char*   pJson = NULL;
×
441

442
  cJSON* tableName = cJSON_CreateString(name);
×
443
  RAW_NULL_CHECK(tableName);
×
444
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
×
445
  cJSON* using = cJSON_CreateString(sname);
×
446
  RAW_NULL_CHECK(using);
×
447
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "using", using));
×
448
  cJSON* tagNumJson = cJSON_CreateNumber(tagNum);
×
449
  RAW_NULL_CHECK(tagNumJson);
×
450
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tagNum", tagNumJson));
×
451

452
  cJSON* tags = cJSON_CreateArray();
×
453
  RAW_NULL_CHECK(tags);
×
454
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags));
×
455
  RAW_RETURN_CHECK(tTagToValArray(pTag, &pTagVals));
×
456
  if (tTagIsJson(pTag)) {
×
457
    STag* p = (STag*)pTag;
×
458
    if (p->nTag == 0) {
×
459
      uError("p->nTag == 0");
×
460
      goto end;
×
461
    }
462
    parseTagDatatoJson(pTag, &pJson, NULL);
×
463
    RAW_NULL_CHECK(pJson);
×
464
    cJSON* tag = cJSON_CreateObject();
×
465
    RAW_NULL_CHECK(tag);
×
466
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag));
×
467
    STagVal* pTagVal = taosArrayGet(pTagVals, 0);
×
468
    RAW_NULL_CHECK(pTagVal);
×
469
    char* ptname = taosArrayGet(tagName, 0);
×
470
    RAW_NULL_CHECK(ptname);
×
471
    cJSON* tname = cJSON_CreateString(ptname);
×
472
    RAW_NULL_CHECK(tname);
×
473
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname));
×
474
    cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON);
×
475
    RAW_NULL_CHECK(ttype);
×
476
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype));
×
477
    cJSON* tvalue = cJSON_CreateString(pJson);
×
478
    RAW_NULL_CHECK(tvalue);
×
479
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "value", tvalue));
×
480
    goto end;
×
481
  }
482

483
  for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
×
484
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
×
485
    RAW_NULL_CHECK(pTagVal);
×
486
    cJSON* tag = cJSON_CreateObject();
×
487
    RAW_NULL_CHECK(tag);
×
488
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag));
×
489
    char* ptname = taosArrayGet(tagName, i);
×
490
    RAW_NULL_CHECK(ptname);
×
491
    cJSON* tname = cJSON_CreateString(ptname);
×
492
    RAW_NULL_CHECK(tname);
×
493
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname));
×
494
    cJSON* ttype = cJSON_CreateNumber(pTagVal->type);
×
495
    RAW_NULL_CHECK(ttype);
×
496
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype));
×
497

498
    cJSON* tvalue = NULL;
×
499
    if (IS_VAR_DATA_TYPE(pTagVal->type)) {
×
500
      if (IS_STR_DATA_BLOB(pTagVal->type)) {
×
501
        goto end;
×
502
      }
503
      int64_t bufSize = 0;
×
504
      if (pTagVal->type == TSDB_DATA_TYPE_VARBINARY) {
×
505
        bufSize = pTagVal->nData * 2 + 2 + 3;
×
506
      } else {
507
        bufSize = pTagVal->nData + 3;
×
508
      }
509
      char* buf = taosMemoryCalloc(bufSize, 1);
×
510
      RAW_NULL_CHECK(buf);
×
511
      if (dataConverToStr(buf, bufSize, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL) != TSDB_CODE_SUCCESS) {
×
512
        taosMemoryFree(buf);
×
513
        goto end;
×
514
      }
515

516
      tvalue = cJSON_CreateString(buf);
×
517
      taosMemoryFree(buf);
×
518
      RAW_NULL_CHECK(tvalue);
×
519
    } else {
520
      double val = 0;
×
521
      GET_TYPED_DATA(val, double, pTagVal->type, &pTagVal->i64,
×
522
                     0);  // currently tag type can't be decimal, so pass 0 as typeMod
523
      tvalue = cJSON_CreateNumber(val);
×
524
      RAW_NULL_CHECK(tvalue);
×
525
    }
526

527
    RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "value", tvalue));
×
528
  }
529

530
end:
×
531
  taosMemoryFree(pJson);
×
532
  taosArrayDestroy(pTagVals);
×
533
}
534

535
static void buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs, cJSON** pJson) {
×
536
  if (pJson == NULL || pCreateReq == NULL) {
×
537
    uError("invalid parameter in %s", __func__);
×
538
    return;
×
539
  }
540
  int32_t code = 0;
×
541
  char*   string = NULL;
×
542
  cJSON*  json = cJSON_CreateObject();
×
543
  RAW_NULL_CHECK(json);
×
544
  cJSON* type = cJSON_CreateString("create");
×
545
  RAW_NULL_CHECK(type);
×
546
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
×
547

548
  cJSON* tableType = cJSON_CreateString("child");
×
549
  RAW_NULL_CHECK(tableType);
×
550
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
×
551

552
  buildChildElement(json, pCreateReq);
×
553
  cJSON* createList = cJSON_CreateArray();
×
554
  RAW_NULL_CHECK(createList);
×
555
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "createList", createList));
×
556

557
  for (int i = 0; nReqs > 1 && i < nReqs; i++) {
×
558
    cJSON* create = cJSON_CreateObject();
×
559
    RAW_NULL_CHECK(create);
×
560
    buildChildElement(create, pCreateReq + i);
×
561
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(createList, create));
×
562
  }
563

564
end:
×
565
  *pJson = json;
×
566
}
567

568
static void processCreateTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
×
569
  if (pJson == NULL || metaRsp == NULL) {
×
570
    uError("invalid parameter in %s", __func__);
×
571
    return;
×
572
  }
573
  SDecoder           decoder = {0};
×
574
  SVCreateTbBatchReq req = {0};
×
575
  SVCreateTbReq*     pCreateReq;
576
  // decode
577
  uDebug("create table data:%p", metaRsp);
×
578
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
×
579
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
×
580
  tDecoderInit(&decoder, data, len);
×
581
  if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
×
582
    goto end;
×
583
  }
584

585
  // loop to create table
586
  if (req.nReqs > 0) {
×
587
    pCreateReq = req.pReqs;
×
588
    if (pCreateReq->type == TSDB_CHILD_TABLE) {
×
589
      buildCreateCTableJson(req.pReqs, req.nReqs, pJson);
×
590
    } else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
×
591
      buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE,
×
592
                           &pCreateReq->colCmpr, pJson);
593
    }
594
  }
595

596
end:
×
597
  uDebug("create table return");
×
598
  tDeleteSVCreateTbBatchReq(&req);
×
599
  tDecoderClear(&decoder);
×
600
}
601

602
static void processAutoCreateTable(SMqDataRsp* rsp, char** string) {
×
603
  if (rsp == NULL || string == NULL) {
×
604
    uError("invalid parameter in %s", __func__);
×
605
    return;
×
606
  }
607
  SDecoder*      decoder = NULL;
×
608
  SVCreateTbReq* pCreateReq = NULL;
×
609
  int32_t        code = 0;
×
610
  uDebug("auto create table data:%p", rsp);
×
611
  if (rsp->createTableNum <= 0) {
×
612
    uError("processAutoCreateTable rsp->createTableNum <= 0");
×
613
    goto end;
×
614
  }
615

616
  decoder = taosMemoryCalloc(rsp->createTableNum, sizeof(SDecoder));
×
617
  RAW_NULL_CHECK(decoder);
×
618
  pCreateReq = taosMemoryCalloc(rsp->createTableNum, sizeof(SVCreateTbReq));
×
619
  RAW_NULL_CHECK(pCreateReq);
×
620

621
  // loop to create table
622
  for (int32_t iReq = 0; iReq < rsp->createTableNum; iReq++) {
×
623
    // decode
624
    void** data = taosArrayGet(rsp->createTableReq, iReq);
×
625
    RAW_NULL_CHECK(data);
×
626
    int32_t* len = taosArrayGet(rsp->createTableLen, iReq);
×
627
    RAW_NULL_CHECK(len);
×
628
    tDecoderInit(&decoder[iReq], *data, *len);
×
629
    if (tDecodeSVCreateTbReq(&decoder[iReq], pCreateReq + iReq) < 0) {
×
630
      goto end;
×
631
    }
632

633
    if (pCreateReq[iReq].type != TSDB_CHILD_TABLE) {
×
634
      uError("processAutoCreateTable pCreateReq[iReq].type != TSDB_CHILD_TABLE");
×
635
      goto end;
×
636
    }
637
  }
638
  cJSON* pJson = NULL;
×
639
  buildCreateCTableJson(pCreateReq, rsp->createTableNum, &pJson);
×
640
  *string = cJSON_PrintUnformatted(pJson);
×
641
  cJSON_Delete(pJson);
×
642

643
end:
×
644
  uDebug("auto created table return, sql json:%s", *string);
×
645
  for (int i = 0; decoder && pCreateReq && i < rsp->createTableNum; i++) {
×
646
    tDecoderClear(&decoder[i]);
×
647
    taosMemoryFreeClear(pCreateReq[i].comment);
×
648
    if (pCreateReq[i].type == TSDB_CHILD_TABLE) {
×
649
      taosArrayDestroy(pCreateReq[i].ctb.tagName);
×
650
    }
651
  }
652
  taosMemoryFree(decoder);
×
653
  taosMemoryFree(pCreateReq);
×
654
}
655

656
static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
×
657
  if (pJson == NULL || metaRsp == NULL) {
×
658
    uError("invalid parameter in %s", __func__);
×
659
    return;
×
660
  }
661
  SDecoder     decoder = {0};
×
662
  SVAlterTbReq vAlterTbReq = {0};
×
663
  char*        string = NULL;
×
664
  cJSON*       json = NULL;
×
665
  int32_t      code = 0;
×
666

667
  uDebug("alter table data:%p", metaRsp);
×
668
  // decode
669
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
×
670
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
×
671
  tDecoderInit(&decoder, data, len);
×
672
  if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) {
×
673
    uError("tDecodeSVAlterTbReq error");
×
674
    goto end;
×
675
  }
676

677
  json = cJSON_CreateObject();
×
678
  RAW_NULL_CHECK(json);
×
679
  cJSON* type = cJSON_CreateString("alter");
×
680
  RAW_NULL_CHECK(type);
×
681
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
×
682
  cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ||
×
683
                                                vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL
×
684
                                            ? "child"
685
                                            : "normal");
686
  RAW_NULL_CHECK(tableType);
×
687
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
×
688
  cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
×
689
  RAW_NULL_CHECK(tableName);
×
690
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
×
691
  cJSON* alterType = cJSON_CreateNumber(vAlterTbReq.action);
×
692
  RAW_NULL_CHECK(alterType);
×
693
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "alterType", alterType));
×
694

695
  switch (vAlterTbReq.action) {
×
696
    case TSDB_ALTER_TABLE_ADD_COLUMN: {
×
697
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
×
698
      RAW_NULL_CHECK(colName);
×
699
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
700
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
×
701
      RAW_NULL_CHECK(colType);
×
702
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
×
703

704
      if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY ||
×
705
          vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) {
×
706
        int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
×
707
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
708
        RAW_NULL_CHECK(cbytes);
×
709
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
710
      } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
×
711
        int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
712
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
713
        RAW_NULL_CHECK(cbytes);
×
714
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
715
      }
716
      break;
×
717
    }
718
    case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: {
×
719
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
×
720
      RAW_NULL_CHECK(colName);
×
721
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
722
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
×
723
      RAW_NULL_CHECK(colType);
×
724
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
×
725

726
      if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY ||
×
727
          vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) {
×
728
        int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
×
729
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
730
        RAW_NULL_CHECK(cbytes);
×
731
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
732
      } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
×
733
        int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
734
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
735
        RAW_NULL_CHECK(cbytes);
×
736
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
737
      }
738
      RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
×
739
      break;
×
740
    }
741
    case TSDB_ALTER_TABLE_DROP_COLUMN: {
×
742
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
×
743
      RAW_NULL_CHECK(colName);
×
744
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
745
      break;
×
746
    }
747
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
×
748
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
×
749
      RAW_NULL_CHECK(colName);
×
750
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
751
      cJSON* colType = cJSON_CreateNumber(vAlterTbReq.colModType);
×
752
      RAW_NULL_CHECK(colType);
×
753
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
×
754
      if (vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY || vAlterTbReq.colModType == TSDB_DATA_TYPE_VARBINARY ||
×
755
          vAlterTbReq.colModType == TSDB_DATA_TYPE_GEOMETRY) {
×
756
        int32_t length = vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE;
×
757
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
758
        RAW_NULL_CHECK(cbytes);
×
759
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
760
      } else if (vAlterTbReq.colModType == TSDB_DATA_TYPE_NCHAR) {
×
761
        int32_t length = (vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
×
762
        cJSON*  cbytes = cJSON_CreateNumber(length);
×
763
        RAW_NULL_CHECK(cbytes);
×
764
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
×
765
      }
766
      break;
×
767
    }
768
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
×
769
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
×
770
      RAW_NULL_CHECK(colName);
×
771
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
772
      cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName);
×
773
      RAW_NULL_CHECK(colNewName);
×
774
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colNewName", colNewName));
×
775
      break;
×
776
    }
777
    case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: {
×
778
      cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName);
×
779
      RAW_NULL_CHECK(tagName);
×
780
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", tagName));
×
781

782
      bool isNull = vAlterTbReq.isNull;
×
783
      if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
×
784
        STag* jsonTag = (STag*)vAlterTbReq.pTagVal;
×
785
        if (jsonTag->nTag == 0) isNull = true;
×
786
      }
787
      if (!isNull) {
×
788
        char* buf = NULL;
×
789

790
        if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
×
791
          if (!tTagIsJson(vAlterTbReq.pTagVal)) {
×
792
            uError("processAlterTable isJson false");
×
793
            goto end;
×
794
          }
795
          parseTagDatatoJson(vAlterTbReq.pTagVal, &buf, NULL);
×
796
          if (buf == NULL) {
×
797
            uError("parseTagDatatoJson failed, buf == NULL");
×
798
            goto end;
×
799
          }
800
        } else {
801
          int64_t bufSize = 0;
×
802
          if (vAlterTbReq.tagType == TSDB_DATA_TYPE_VARBINARY) {
×
803
            bufSize = vAlterTbReq.nTagVal * 2 + 2 + 3;
×
804
          } else {
805
            bufSize = vAlterTbReq.nTagVal + 32;
×
806
          }
807
          buf = taosMemoryCalloc(bufSize, 1);
×
808
          RAW_NULL_CHECK(buf);
×
809
          if (dataConverToStr(buf, bufSize, vAlterTbReq.tagType, vAlterTbReq.pTagVal, vAlterTbReq.nTagVal, NULL) !=
×
810
              TSDB_CODE_SUCCESS) {
811
            taosMemoryFree(buf);
×
812
            goto end;
×
813
          }
814
        }
815

816
        cJSON* colValue = cJSON_CreateString(buf);
×
817
        taosMemoryFree(buf);
×
818
        RAW_NULL_CHECK(colValue);
×
819
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colValue", colValue));
×
820
      }
821

822
      cJSON* isNullCJson = cJSON_CreateBool(isNull);
×
823
      RAW_NULL_CHECK(isNullCJson);
×
824
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colValueNull", isNullCJson));
×
825
      break;
×
826
    }
827
    case TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL: {
×
828
      int32_t nTags = taosArrayGetSize(vAlterTbReq.pMultiTag);
×
829
      if (nTags <= 0) {
×
830
        uError("processAlterTable parse multi tags error");
×
831
        goto end;
×
832
      }
833

834
      cJSON* tags = cJSON_CreateArray();
×
835
      RAW_NULL_CHECK(tags);
×
836
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags));
×
837

838
      for (int32_t i = 0; i < nTags; i++) {
×
839
        cJSON* member = cJSON_CreateObject();
×
840
        RAW_NULL_CHECK(member);
×
841
        RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, member));
×
842

843
        SMultiTagUpateVal* pTagVal = taosArrayGet(vAlterTbReq.pMultiTag, i);
×
844
        cJSON*             tagName = cJSON_CreateString(pTagVal->tagName);
×
845
        RAW_NULL_CHECK(tagName);
×
846
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colName", tagName));
×
847

848
        if (pTagVal->tagType == TSDB_DATA_TYPE_JSON) {
×
849
          uError("processAlterTable isJson false");
×
850
          goto end;
×
851
        }
852
        bool isNull = pTagVal->isNull;
×
853
        if (!isNull) {
×
854
          int64_t bufSize = 0;
×
855
          if (pTagVal->tagType == TSDB_DATA_TYPE_VARBINARY) {
×
856
            bufSize = pTagVal->nTagVal * 2 + 2 + 3;
×
857
          } else {
858
            bufSize = pTagVal->nTagVal + 3;
×
859
          }
860
          char* buf = taosMemoryCalloc(bufSize, 1);
×
861
          RAW_NULL_CHECK(buf);
×
862
          if (dataConverToStr(buf, bufSize, pTagVal->tagType, pTagVal->pTagVal, pTagVal->nTagVal, NULL) !=
×
863
              TSDB_CODE_SUCCESS) {
864
            taosMemoryFree(buf);
×
865
            goto end;
×
866
          }
867
          cJSON* colValue = cJSON_CreateString(buf);
×
868
          taosMemoryFree(buf);
×
869
          RAW_NULL_CHECK(colValue);
×
870
          RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colValue", colValue));
×
871
        }
872
        cJSON* isNullCJson = cJSON_CreateBool(isNull);
×
873
        RAW_NULL_CHECK(isNullCJson);
×
874
        RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colValueNull", isNullCJson));
×
875
      }
876
      break;
×
877
    }
878

879
    case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
×
880
      cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
×
881
      RAW_NULL_CHECK(colName);
×
882
      RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
×
883
      RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
×
884
      break;
×
885
    }
886
    default:
×
887
      break;
×
888
  }
889

890
end:
×
891
  uDebug("alter table return");
×
892
  if (vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL) {
×
893
    taosArrayDestroy(vAlterTbReq.pMultiTag);
×
894
  }
895
  tDecoderClear(&decoder);
×
896
  *pJson = json;
×
897
}
898

899
static void processDropSTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
×
900
  if (pJson == NULL || metaRsp == NULL) {
×
901
    uError("invalid parameter in %s", __func__);
×
902
    return;
×
903
  }
904
  SDecoder     decoder = {0};
×
905
  SVDropStbReq req = {0};
×
906
  cJSON*       json = NULL;
×
907
  int32_t      code = 0;
×
908

909
  uDebug("processDropSTable data:%p", metaRsp);
×
910

911
  // decode
912
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
×
913
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
×
914
  tDecoderInit(&decoder, data, len);
×
915
  if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
×
916
    uError("tDecodeSVDropStbReq failed");
×
917
    goto end;
×
918
  }
919

920
  json = cJSON_CreateObject();
×
921
  RAW_NULL_CHECK(json);
×
922
  cJSON* type = cJSON_CreateString("drop");
×
923
  RAW_NULL_CHECK(type);
×
924
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
×
925
  cJSON* tableType = cJSON_CreateString("super");
×
926
  RAW_NULL_CHECK(tableType);
×
927
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
×
928
  cJSON* tableName = cJSON_CreateString(req.name);
×
929
  RAW_NULL_CHECK(tableName);
×
930
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
×
931

932
end:
×
933
  uDebug("processDropSTable return");
×
934
  tDecoderClear(&decoder);
×
935
  *pJson = json;
×
936
}
937
static void processDeleteTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
×
938
  if (pJson == NULL || metaRsp == NULL) {
×
939
    uError("invalid parameter in %s", __func__);
×
940
    return;
×
941
  }
942
  SDeleteRes req = {0};
×
943
  SDecoder   coder = {0};
×
944
  cJSON*     json = NULL;
×
945
  int32_t    code = 0;
×
946

947
  uDebug("processDeleteTable data:%p", metaRsp);
×
948
  // decode and process req
949
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
×
950
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
×
951

952
  tDecoderInit(&coder, data, len);
×
953
  if (tDecodeDeleteRes(&coder, &req) < 0) {
×
954
    uError("tDecodeDeleteRes failed");
×
955
    goto end;
×
956
  }
957

958
  //  getTbName(req.tableFName);
959
  char sql[256] = {0};
×
960
  (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
×
961
                 req.tsColName, req.skey, req.tsColName, req.ekey);
962

963
  json = cJSON_CreateObject();
×
964
  RAW_NULL_CHECK(json);
×
965
  cJSON* type = cJSON_CreateString("delete");
×
966
  RAW_NULL_CHECK(type);
×
967
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
×
968
  cJSON* sqlJson = cJSON_CreateString(sql);
×
969
  RAW_NULL_CHECK(sqlJson);
×
970
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "sql", sqlJson));
×
971

972
end:
×
973
  uDebug("processDeleteTable return");
×
974
  tDecoderClear(&coder);
×
975
  *pJson = json;
×
976
}
977

978
static void processDropTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
×
979
  if (pJson == NULL || metaRsp == NULL) {
×
980
    uError("invalid parameter in %s", __func__);
×
981
    return;
×
982
  }
983
  SDecoder         decoder = {0};
×
984
  SVDropTbBatchReq req = {0};
×
985
  cJSON*           json = NULL;
×
986
  int32_t          code = 0;
×
987

988
  uDebug("processDropTable data:%p", metaRsp);
×
989
  // decode
990
  void*   data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
×
991
  int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
×
992
  tDecoderInit(&decoder, data, len);
×
993
  if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) {
×
994
    uError("tDecodeSVDropTbBatchReq failed");
×
995
    goto end;
×
996
  }
997

998
  json = cJSON_CreateObject();
×
999
  RAW_NULL_CHECK(json);
×
1000
  cJSON* type = cJSON_CreateString("drop");
×
1001
  RAW_NULL_CHECK(type);
×
1002
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
×
1003
  cJSON* tableNameList = cJSON_CreateArray();
×
1004
  RAW_NULL_CHECK(tableNameList);
×
1005
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableNameList", tableNameList));
×
1006

1007
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
1008
    SVDropTbReq* pDropTbReq = req.pReqs + iReq;
×
1009
    cJSON*       tableName = cJSON_CreateString(pDropTbReq->name);
×
1010
    RAW_NULL_CHECK(tableName);
×
1011
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(tableNameList, tableName));
×
1012
  }
1013

1014
end:
×
1015
  uDebug("processDropTable return");
×
1016
  tDecoderClear(&decoder);
×
1017
  *pJson = json;
×
1018
}
1019

1020
static int32_t taosCreateStb(TAOS* taos, void* meta, uint32_t metaLen) {
×
1021
  if (taos == NULL || meta == NULL) {
×
1022
    uError("invalid parameter in %s", __func__);
×
1023
    return TSDB_CODE_INVALID_PARA;
×
1024
  }
1025
  SVCreateStbReq req = {0};
×
1026
  SDecoder       coder = {0};
×
1027
  SMCreateStbReq pReq = {0};
×
1028
  int32_t        code = TSDB_CODE_SUCCESS;
×
1029
  SRequestObj*   pRequest = NULL;
×
1030

1031
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
×
1032
  uDebug(LOG_ID_TAG " create stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
×
1033
  pRequest->syncQuery = true;
×
1034
  if (!pRequest->pDb) {
×
1035
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1036
    goto end;
×
1037
  }
1038
  // decode and process req
1039
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
×
1040
  uint32_t len = metaLen - sizeof(SMsgHead);
×
1041
  tDecoderInit(&coder, data, len);
×
1042
  if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
×
1043
    code = TSDB_CODE_INVALID_PARA;
×
1044
    goto end;
×
1045
  }
1046

1047
  int8_t           createDefaultCompress = 0;
×
1048
  SColCmprWrapper* p = &req.colCmpr;
×
1049
  if (p->nCols == 0) {
×
1050
    createDefaultCompress = 1;
×
1051
  }
1052
  // build create stable
1053
  pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SFieldWithOptions));
×
1054
  RAW_NULL_CHECK(pReq.pColumns);
×
1055
  for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
×
1056
    SSchema*          pSchema = req.schemaRow.pSchema + i;
×
1057
    SFieldWithOptions field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
×
1058
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
×
1059

1060
    if (createDefaultCompress) {
×
1061
      field.compress = createDefaultColCmprByType(pSchema->type);
×
1062
    } else {
1063
      SColCmpr* pCmp = &req.colCmpr.pColCmpr[i];
×
1064
      field.compress = pCmp->alg;
×
1065
    }
1066
    if (req.pExtSchemas) field.typeMod = req.pExtSchemas[i].typeMod;
×
1067
    RAW_NULL_CHECK(taosArrayPush(pReq.pColumns, &field));
×
1068
  }
1069
  pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
×
1070
  RAW_NULL_CHECK(pReq.pTags);
×
1071
  for (int32_t i = 0; i < req.schemaTag.nCols; i++) {
×
1072
    SSchema* pSchema = req.schemaTag.pSchema + i;
×
1073
    SField   field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
×
1074
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
×
1075
    RAW_NULL_CHECK(taosArrayPush(pReq.pTags, &field));
×
1076
  }
1077

1078
  pReq.colVer = req.schemaRow.version;
×
1079
  pReq.tagVer = req.schemaTag.version;
×
1080
  pReq.numOfColumns = req.schemaRow.nCols;
×
1081
  pReq.numOfTags = req.schemaTag.nCols;
×
1082
  pReq.commentLen = -1;
×
1083
  pReq.suid = processSuid(req.suid, pRequest->pDb);
×
1084
  pReq.source = TD_REQ_FROM_TAOX;
×
1085
  pReq.igExists = true;
×
1086

1087
  uDebug(LOG_ID_TAG " create stable name:%s suid:%" PRId64 " processSuid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
×
1088
         pReq.suid);
1089
  STscObj* pTscObj = pRequest->pTscObj;
×
1090
  SName    tableName = {0};
×
1091
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
×
1092
  RAW_RETURN_CHECK(tNameExtractFullName(&tableName, pReq.name));
×
1093
  SCmdMsgInfo pCmdMsg = {0};
×
1094
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
×
1095
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
×
1096
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
×
1097
  if (pCmdMsg.msgLen <= 0) {
×
1098
    code = TSDB_CODE_INVALID_PARA;
×
1099
    goto end;
×
1100
  }
1101
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
×
1102
  RAW_NULL_CHECK(pCmdMsg.pMsg);
×
1103
  if (tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) <= 0) {
×
1104
    code = TSDB_CODE_INVALID_PARA;
×
1105
    taosMemoryFree(pCmdMsg.pMsg);
×
1106
    goto end;
×
1107
  }
1108

1109
  SQuery pQuery = {0};
×
1110
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
×
1111
  pQuery.pCmdMsg = &pCmdMsg;
×
1112
  pQuery.msgType = pQuery.pCmdMsg->msgType;
×
1113
  pQuery.stableQuery = true;
×
1114

1115
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
×
1116

1117
  taosMemoryFree(pCmdMsg.pMsg);
×
1118

1119
  if (pRequest->code == TSDB_CODE_SUCCESS) {
×
1120
    SCatalog* pCatalog = NULL;
×
1121
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
×
1122
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
×
1123
  }
1124

1125
  code = pRequest->code;
×
1126

1127
end:
×
1128
  uDebug(LOG_ID_TAG " create stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
×
1129
  destroyRequest(pRequest);
×
1130
  tFreeSMCreateStbReq(&pReq);
×
1131
  tDecoderClear(&coder);
×
1132
  return code;
×
1133
}
1134

1135
static int32_t taosDropStb(TAOS* taos, void* meta, uint32_t metaLen) {
×
1136
  if (taos == NULL || meta == NULL) {
×
1137
    uError("invalid parameter in %s", __func__);
×
1138
    return TSDB_CODE_INVALID_PARA;
×
1139
  }
1140
  SVDropStbReq req = {0};
×
1141
  SDecoder     coder = {0};
×
1142
  SMDropStbReq pReq = {0};
×
1143
  int32_t      code = TSDB_CODE_SUCCESS;
×
1144
  SRequestObj* pRequest = NULL;
×
1145

1146
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
×
1147
  uDebug(LOG_ID_TAG " drop stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
×
1148
  pRequest->syncQuery = true;
×
1149
  if (!pRequest->pDb) {
×
1150
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1151
    goto end;
×
1152
  }
1153
  // decode and process req
1154
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
×
1155
  uint32_t len = metaLen - sizeof(SMsgHead);
×
1156
  tDecoderInit(&coder, data, len);
×
1157
  if (tDecodeSVDropStbReq(&coder, &req) < 0) {
×
1158
    code = TSDB_CODE_INVALID_PARA;
×
1159
    goto end;
×
1160
  }
1161

1162
  SCatalog* pCatalog = NULL;
×
1163
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
×
1164
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
×
1165
                           .requestId = pRequest->requestId,
×
1166
                           .requestObjRefId = pRequest->self,
×
1167
                           .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
×
1168
  SName            pName = {0};
×
1169
  toName(pRequest->pTscObj->acctId, pRequest->pDb, req.name, &pName);
×
1170
  STableMeta* pTableMeta = NULL;
×
1171
  code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
×
1172
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
×
1173
    code = TSDB_CODE_SUCCESS;
×
1174
    taosMemoryFreeClear(pTableMeta);
×
1175
    goto end;
×
1176
  }
1177
  if (code != TSDB_CODE_SUCCESS) {
×
1178
    goto end;
×
1179
  }
1180
  pReq.suid = pTableMeta->uid;
×
1181
  taosMemoryFreeClear(pTableMeta);
×
1182

1183
  // build drop stable
1184
  pReq.igNotExists = true;
×
1185
  pReq.source = TD_REQ_FROM_TAOX;
×
1186
  //  pReq.suid = processSuid(req.suid, pRequest->pDb);
1187

1188
  uDebug(LOG_ID_TAG " drop stable name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
×
1189
         pReq.suid);
1190
  STscObj* pTscObj = pRequest->pTscObj;
×
1191
  SName    tableName = {0};
×
1192
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
×
1193
  if (tNameExtractFullName(&tableName, pReq.name) != 0) {
×
1194
    code = TSDB_CODE_INVALID_PARA;
×
1195
    goto end;
×
1196
  }
1197

1198
  SCmdMsgInfo pCmdMsg = {0};
×
1199
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
×
1200
  pCmdMsg.msgType = TDMT_MND_DROP_STB;
×
1201
  pCmdMsg.msgLen = tSerializeSMDropStbReq(NULL, 0, &pReq);
×
1202
  if (pCmdMsg.msgLen <= 0) {
×
1203
    code = TSDB_CODE_INVALID_PARA;
×
1204
    goto end;
×
1205
  }
1206
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
×
1207
  RAW_NULL_CHECK(pCmdMsg.pMsg);
×
1208
  if (tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) <= 0) {
×
1209
    code = TSDB_CODE_INVALID_PARA;
×
1210
    taosMemoryFree(pCmdMsg.pMsg);
×
1211
    goto end;
×
1212
  }
1213

1214
  SQuery pQuery = {0};
×
1215
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
×
1216
  pQuery.pCmdMsg = &pCmdMsg;
×
1217
  pQuery.msgType = pQuery.pCmdMsg->msgType;
×
1218
  pQuery.stableQuery = true;
×
1219

1220
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
×
1221
  taosMemoryFree(pCmdMsg.pMsg);
×
1222
  if (pRequest->code == TSDB_CODE_SUCCESS) {
×
1223
    // ignore the error code
1224
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
×
1225
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
×
1226
  }
1227

1228
  code = pRequest->code;
×
1229

1230
end:
×
1231
  uDebug(LOG_ID_TAG " drop stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
×
1232
  destroyRequest(pRequest);
×
1233
  tDecoderClear(&coder);
×
1234
  return code;
×
1235
}
1236

1237
typedef struct SVgroupCreateTableBatch {
1238
  SVCreateTbBatchReq req;
1239
  SVgroupInfo        info;
1240
  char               dbName[TSDB_DB_NAME_LEN];
1241
} SVgroupCreateTableBatch;
1242

1243
static void destroyCreateTbReqBatch(void* data) {
×
1244
  if (data == NULL) {
×
1245
    uError("invalid parameter in %s", __func__);
×
1246
    return;
×
1247
  }
1248
  SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data;
×
1249
  taosArrayDestroy(pTbBatch->req.pArray);
×
1250
}
1251

1252
static int32_t taosCreateTable(TAOS* taos, void* meta, uint32_t metaLen) {
×
1253
  if (taos == NULL || meta == NULL) {
×
1254
    uError("invalid parameter in %s", __func__);
×
1255
    return TSDB_CODE_INVALID_PARA;
×
1256
  }
1257
  SVCreateTbBatchReq req = {0};
×
1258
  SDecoder           coder = {0};
×
1259
  int32_t            code = TSDB_CODE_SUCCESS;
×
1260
  SRequestObj*       pRequest = NULL;
×
1261
  SQuery*            pQuery = NULL;
×
1262
  SHashObj*          pVgroupHashmap = NULL;
×
1263
  SArray*            pTagList = taosArrayInit(0, POINTER_BYTES);
×
1264
  RAW_NULL_CHECK(pTagList);
×
1265
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
×
1266
  uDebug(LOG_ID_TAG " create table, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
×
1267

1268
  pRequest->syncQuery = true;
×
1269
  if (!pRequest->pDb) {
×
1270
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1271
    goto end;
×
1272
  }
1273
  // decode and process req
1274
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
×
1275
  uint32_t len = metaLen - sizeof(SMsgHead);
×
1276
  tDecoderInit(&coder, data, len);
×
1277
  if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
×
1278
    code = TSDB_CODE_INVALID_PARA;
×
1279
    goto end;
×
1280
  }
1281

1282
  STscObj* pTscObj = pRequest->pTscObj;
×
1283

1284
  SVCreateTbReq* pCreateReq = NULL;
×
1285
  SCatalog*      pCatalog = NULL;
×
1286
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
×
1287
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
1288
  RAW_NULL_CHECK(pVgroupHashmap);
×
1289
  taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
×
1290

1291
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
×
1292
                           .requestId = pRequest->requestId,
×
1293
                           .requestObjRefId = pRequest->self,
×
1294
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
×
1295

1296
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
×
1297
  RAW_NULL_CHECK(pRequest->tableList);
×
1298
  // loop to create table
1299
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
1300
    pCreateReq = req.pReqs + iReq;
×
1301

1302
    SVgroupInfo pInfo = {0};
×
1303
    SName       pName = {0};
×
1304
    toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
×
1305
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
×
1306
    if (code != TSDB_CODE_SUCCESS) {
×
1307
      goto end;
×
1308
    }
1309

1310
    pCreateReq->flags |= TD_CREATE_IF_NOT_EXISTS;
×
1311
    // change tag cid to new cid
1312
    if (pCreateReq->type == TSDB_CHILD_TABLE) {
×
1313
      STableMeta* pTableMeta = NULL;
×
1314
      SName       sName = {0};
×
1315
      tb_uid_t    oldSuid = pCreateReq->ctb.suid;
×
1316
      //      pCreateReq->ctb.suid = processSuid(pCreateReq->ctb.suid, pRequest->pDb);
1317
      toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName);
×
1318
      code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta);
×
1319
      if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
×
1320
        code = TSDB_CODE_SUCCESS;
×
1321
        taosMemoryFreeClear(pTableMeta);
×
1322
        continue;
×
1323
      }
1324

1325
      if (code != TSDB_CODE_SUCCESS) {
×
1326
        goto end;
×
1327
      }
1328
      pCreateReq->ctb.suid = pTableMeta->uid;
×
1329

1330
      SArray* pTagVals = NULL;
×
1331
      code = tTagToValArray((STag*)pCreateReq->ctb.pTag, &pTagVals);
×
1332
      if (code != TSDB_CODE_SUCCESS) {
×
1333
        taosMemoryFreeClear(pTableMeta);
×
1334
        goto end;
×
1335
      }
1336

1337
      bool rebuildTag = false;
×
1338
      for (int32_t i = 0; i < taosArrayGetSize(pCreateReq->ctb.tagName); i++) {
×
1339
        char* tName = taosArrayGet(pCreateReq->ctb.tagName, i);
×
1340
        if (tName == NULL) {
×
1341
          continue;
×
1342
        }
1343
        for (int32_t j = pTableMeta->tableInfo.numOfColumns;
×
1344
             j < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; j++) {
×
1345
          SSchema* tag = &pTableMeta->schema[j];
×
1346
          if (strcmp(tag->name, tName) == 0 && tag->type != TSDB_DATA_TYPE_JSON) {
×
1347
            STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
×
1348
            if (pTagVal) {
×
1349
              if (pTagVal->cid != tag->colId) {
×
1350
                pTagVal->cid = tag->colId;
×
1351
                rebuildTag = true;
×
1352
              }
1353
            } else {
1354
              uError("create tb invalid data %s, size:%d index:%d cid:%d", pCreateReq->name,
×
1355
                     (int)taosArrayGetSize(pTagVals), i, tag->colId);
1356
            }
1357
          }
1358
        }
1359
      }
1360
      taosMemoryFreeClear(pTableMeta);
×
1361
      if (rebuildTag) {
×
1362
        STag* ppTag = NULL;
×
1363
        code = tTagNew(pTagVals, 1, false, &ppTag);
×
1364
        taosArrayDestroy(pTagVals);
×
1365
        pTagVals = NULL;
×
1366
        if (code != TSDB_CODE_SUCCESS) {
×
1367
          goto end;
×
1368
        }
1369
        if (NULL == taosArrayPush(pTagList, &ppTag)) {
×
1370
          tTagFree(ppTag);
×
1371
          goto end;
×
1372
        }
1373
        pCreateReq->ctb.pTag = (uint8_t*)ppTag;
×
1374
      }
1375
      taosArrayDestroy(pTagVals);
×
1376
    }
1377
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
×
1378

1379
    SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
×
1380
    if (pTableBatch == NULL) {
×
1381
      SVgroupCreateTableBatch tBatch = {0};
×
1382
      tBatch.info = pInfo;
×
1383
      tstrncpy(tBatch.dbName, pRequest->pDb, TSDB_DB_NAME_LEN);
×
1384

1385
      tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
×
1386
      RAW_NULL_CHECK(tBatch.req.pArray);
×
1387
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pCreateReq));
×
1388
      tBatch.req.source = TD_REQ_FROM_TAOX;
×
1389
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
×
1390
    } else {  // add to the correct vgroup
1391
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pCreateReq));
×
1392
    }
1393
  }
1394

1395
  if (taosHashGetSize(pVgroupHashmap) == 0) {
×
1396
    goto end;
×
1397
  }
1398
  SArray* pBufArray = NULL;
×
1399
  RAW_RETURN_CHECK(serializeVgroupsCreateTableBatch(pVgroupHashmap, &pBufArray));
×
1400
  pQuery = NULL;
×
1401
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
×
1402
  if (TSDB_CODE_SUCCESS != code) goto end;
×
1403
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
×
1404
  pQuery->msgType = TDMT_VND_CREATE_TABLE;
×
1405
  pQuery->stableQuery = false;
×
1406
  code = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT, &pQuery->pRoot);
×
1407
  if (TSDB_CODE_SUCCESS != code) goto end;
×
1408
  RAW_NULL_CHECK(pQuery->pRoot);
×
1409

1410
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
×
1411

1412
  launchQueryImpl(pRequest, pQuery, true, NULL);
×
1413
  if (pRequest->code == TSDB_CODE_SUCCESS) {
×
1414
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
×
1415
  }
1416

1417
  code = pRequest->code;
×
1418

1419
end:
×
1420
  uDebug(LOG_ID_TAG " create table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
×
1421
  tDeleteSVCreateTbBatchReq(&req);
×
1422

1423
  taosHashCleanup(pVgroupHashmap);
×
1424
  destroyRequest(pRequest);
×
1425
  tDecoderClear(&coder);
×
1426
  qDestroyQuery(pQuery);
×
1427
  taosArrayDestroyP(pTagList, NULL);
×
1428
  return code;
×
1429
}
1430

1431
typedef struct SVgroupDropTableBatch {
1432
  SVDropTbBatchReq req;
1433
  SVgroupInfo      info;
1434
  char             dbName[TSDB_DB_NAME_LEN];
1435
} SVgroupDropTableBatch;
1436

1437
static void destroyDropTbReqBatch(void* data) {
×
1438
  if (data == NULL) {
×
1439
    uError("invalid parameter in %s", __func__);
×
1440
    return;
×
1441
  }
1442
  SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
×
1443
  taosArrayDestroy(pTbBatch->req.pArray);
×
1444
}
1445

1446
static int32_t taosDropTable(TAOS* taos, void* meta, uint32_t metaLen) {
×
1447
  if (taos == NULL || meta == NULL) {
×
1448
    uError("invalid parameter in %s", __func__);
×
1449
    return TSDB_CODE_INVALID_PARA;
×
1450
  }
1451
  SVDropTbBatchReq req = {0};
×
1452
  SDecoder         coder = {0};
×
1453
  int32_t          code = TSDB_CODE_SUCCESS;
×
1454
  SRequestObj*     pRequest = NULL;
×
1455
  SQuery*          pQuery = NULL;
×
1456
  SHashObj*        pVgroupHashmap = NULL;
×
1457

1458
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
×
1459
  uDebug(LOG_ID_TAG " drop table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
×
1460

1461
  pRequest->syncQuery = true;
×
1462
  if (!pRequest->pDb) {
×
1463
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1464
    goto end;
×
1465
  }
1466
  // decode and process req
1467
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
×
1468
  uint32_t len = metaLen - sizeof(SMsgHead);
×
1469
  tDecoderInit(&coder, data, len);
×
1470
  if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
×
1471
    code = TSDB_CODE_INVALID_PARA;
×
1472
    goto end;
×
1473
  }
1474

1475
  STscObj* pTscObj = pRequest->pTscObj;
×
1476

1477
  SVDropTbReq* pDropReq = NULL;
×
1478
  SCatalog*    pCatalog = NULL;
×
1479
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
×
1480

1481
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
×
1482
  RAW_NULL_CHECK(pVgroupHashmap);
×
1483
  taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
×
1484

1485
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
×
1486
                           .requestId = pRequest->requestId,
×
1487
                           .requestObjRefId = pRequest->self,
×
1488
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
×
1489
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
×
1490
  RAW_NULL_CHECK(pRequest->tableList);
×
1491
  // loop to create table
1492
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
1493
    pDropReq = req.pReqs + iReq;
×
1494
    pDropReq->igNotExists = true;
×
1495
    //    pDropReq->suid = processSuid(pDropReq->suid, pRequest->pDb);
1496

1497
    SVgroupInfo pInfo = {0};
×
1498
    SName       pName = {0};
×
1499
    toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
×
1500
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
×
1501

1502
    STableMeta* pTableMeta = NULL;
×
1503
    code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
×
1504
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
×
1505
      code = TSDB_CODE_SUCCESS;
×
1506
      taosMemoryFreeClear(pTableMeta);
×
1507
      continue;
×
1508
    }
1509
    if (code != TSDB_CODE_SUCCESS) {
×
1510
      goto end;
×
1511
    }
1512
    tb_uid_t oldSuid = pDropReq->suid;
×
1513
    pDropReq->suid = pTableMeta->suid;
×
1514
    taosMemoryFreeClear(pTableMeta);
×
1515
    uDebug(LOG_ID_TAG " drop table name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, pDropReq->name, oldSuid,
×
1516
           pDropReq->suid);
1517

1518
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
×
1519
    SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
×
1520
    if (pTableBatch == NULL) {
×
1521
      SVgroupDropTableBatch tBatch = {0};
×
1522
      tBatch.info = pInfo;
×
1523
      tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
×
1524
      RAW_NULL_CHECK(tBatch.req.pArray);
×
1525
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pDropReq));
×
1526
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
×
1527
    } else {  // add to the correct vgroup
1528
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pDropReq));
×
1529
    }
1530
  }
1531

1532
  if (taosHashGetSize(pVgroupHashmap) == 0) {
×
1533
    goto end;
×
1534
  }
1535
  SArray* pBufArray = NULL;
×
1536
  RAW_RETURN_CHECK(serializeVgroupsDropTableBatch(pVgroupHashmap, &pBufArray));
×
1537
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
×
1538
  if (TSDB_CODE_SUCCESS != code) goto end;
×
1539
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
×
1540
  pQuery->msgType = TDMT_VND_DROP_TABLE;
×
1541
  pQuery->stableQuery = false;
×
1542
  pQuery->pRoot = NULL;
×
1543
  code = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT, &pQuery->pRoot);
×
1544
  if (TSDB_CODE_SUCCESS != code) goto end;
×
1545
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
×
1546

1547
  launchQueryImpl(pRequest, pQuery, true, NULL);
×
1548
  if (pRequest->code == TSDB_CODE_SUCCESS) {
×
1549
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
×
1550
  }
1551
  code = pRequest->code;
×
1552

1553
end:
×
1554
  uDebug(LOG_ID_TAG " drop table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
×
1555
  taosHashCleanup(pVgroupHashmap);
×
1556
  destroyRequest(pRequest);
×
1557
  tDecoderClear(&coder);
×
1558
  qDestroyQuery(pQuery);
×
1559
  return code;
×
1560
}
1561

1562
static int32_t taosDeleteData(TAOS* taos, void* meta, uint32_t metaLen) {
×
1563
  if (taos == NULL || meta == NULL) {
×
1564
    uError("invalid parameter in %s", __func__);
×
1565
    return TSDB_CODE_INVALID_PARA;
×
1566
  }
1567
  SDeleteRes req = {0};
×
1568
  SDecoder   coder = {0};
×
1569
  char       sql[256] = {0};
×
1570
  int32_t    code = TSDB_CODE_SUCCESS;
×
1571

1572
  uDebug("connId:0x%" PRIx64 " delete data, meta:%p, len:%d", *(int64_t*)taos, meta, metaLen);
×
1573

1574
  // decode and process req
1575
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
×
1576
  uint32_t len = metaLen - sizeof(SMsgHead);
×
1577
  tDecoderInit(&coder, data, len);
×
1578
  if (tDecodeDeleteRes(&coder, &req) < 0) {
×
1579
    code = TSDB_CODE_INVALID_PARA;
×
1580
    goto end;
×
1581
  }
1582

1583
  (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
×
1584
                 req.tsColName, req.skey, req.tsColName, req.ekey);
1585

1586
  TAOS_RES* res = taosQueryImpl(taos, sql, false, TD_REQ_FROM_TAOX);
×
1587
  RAW_NULL_CHECK(res);
×
1588
  SRequestObj* pRequest = (SRequestObj*)res;
×
1589
  code = pRequest->code;
×
1590
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_PAR_GET_META_ERROR) {
×
1591
    code = TSDB_CODE_SUCCESS;
×
1592
  }
1593
  taos_free_result(res);
×
1594

1595
end:
×
1596
  uDebug("connId:0x%" PRIx64 " delete data sql:%s, code:%s", *(int64_t*)taos, sql, tstrerror(code));
×
1597
  tDecoderClear(&coder);
×
1598
  return code;
×
1599
}
1600

1601
static int32_t taosAlterTable(TAOS* taos, void* meta, uint32_t metaLen) {
×
1602
  if (taos == NULL || meta == NULL) {
×
1603
    uError("invalid parameter in %s", __func__);
×
1604
    return TSDB_CODE_INVALID_PARA;
×
1605
  }
1606
  SVAlterTbReq   req = {0};
×
1607
  SDecoder       dcoder = {0};
×
1608
  int32_t        code = TSDB_CODE_SUCCESS;
×
1609
  SRequestObj*   pRequest = NULL;
×
1610
  SQuery*        pQuery = NULL;
×
1611
  SArray*        pArray = NULL;
×
1612
  SVgDataBlocks* pVgData = NULL;
×
1613

1614
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
×
1615
  uDebug(LOG_ID_TAG " alter table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
×
1616
  pRequest->syncQuery = true;
×
1617
  if (!pRequest->pDb) {
×
1618
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1619
    goto end;
×
1620
  }
1621
  // decode and process req
1622
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
×
1623
  uint32_t len = metaLen - sizeof(SMsgHead);
×
1624
  tDecoderInit(&dcoder, data, len);
×
1625
  if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) {
×
1626
    code = TSDB_CODE_INVALID_PARA;
×
1627
    goto end;
×
1628
  }
1629

1630
  // do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
1631
  if (req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS) {
×
1632
    goto end;
×
1633
  }
1634

1635
  STscObj*  pTscObj = pRequest->pTscObj;
×
1636
  SCatalog* pCatalog = NULL;
×
1637
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
×
1638
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
×
1639
                           .requestId = pRequest->requestId,
×
1640
                           .requestObjRefId = pRequest->self,
×
1641
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
×
1642

1643
  SVgroupInfo pInfo = {0};
×
1644
  SName       pName = {0};
×
1645
  toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
×
1646
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
×
1647
  pArray = taosArrayInit(1, sizeof(void*));
×
1648
  RAW_NULL_CHECK(pArray);
×
1649

1650
  pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
×
1651
  RAW_NULL_CHECK(pVgData);
×
1652
  pVgData->vg = pInfo;
×
1653

1654
  int tlen = 0;
×
1655
  req.source = TD_REQ_FROM_TAOX;
×
1656
  tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code);
×
1657
  if (code != 0) {
×
1658
    code = terrno;
×
1659
    goto end;
×
1660
  }
1661
  tlen += sizeof(SMsgHead);
×
1662
  void* pMsg = taosMemoryMalloc(tlen);
×
1663
  RAW_NULL_CHECK(pMsg);
×
1664
  ((SMsgHead*)pMsg)->vgId = htonl(pInfo.vgId);
×
1665
  ((SMsgHead*)pMsg)->contLen = htonl(tlen);
×
1666
  void*    pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
×
1667
  SEncoder coder = {0};
×
1668
  tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
×
1669
  code = tEncodeSVAlterTbReq(&coder, &req);
×
1670
  if (code != 0) {
×
1671
    tEncoderClear(&coder);
×
1672
    code = terrno;
×
1673
    goto end;
×
1674
  }
1675
  tEncoderClear(&coder);
×
1676

1677
  pVgData->pData = pMsg;
×
1678
  pVgData->size = tlen;
×
1679

1680
  pVgData->numOfTables = 1;
×
1681
  RAW_NULL_CHECK(taosArrayPush(pArray, &pVgData));
×
1682

1683
  pQuery = NULL;
×
1684
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
×
1685
  if (NULL == pQuery) goto end;
×
1686
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
×
1687
  pQuery->msgType = TDMT_VND_ALTER_TABLE;
×
1688
  pQuery->stableQuery = false;
×
1689
  pQuery->pRoot = NULL;
×
1690
  code = nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT, &pQuery->pRoot);
×
1691
  if (TSDB_CODE_SUCCESS != code) goto end;
×
1692
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pArray));
×
1693

1694
  launchQueryImpl(pRequest, pQuery, true, NULL);
×
1695

1696
  pVgData = NULL;
×
1697
  pArray = NULL;
×
1698
  code = pRequest->code;
×
1699
  if (code == TSDB_CODE_TDB_TABLE_NOT_EXIST) {
×
1700
    code = TSDB_CODE_SUCCESS;
×
1701
  }
1702

1703
  if (pRequest->code == TSDB_CODE_SUCCESS) {
×
1704
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
×
1705
    if (pRes->res != NULL) {
×
1706
      code = handleAlterTbExecRes(pRes->res, pCatalog);
×
1707
    }
1708
  }
1709
end:
×
1710
  uDebug(LOG_ID_TAG " alter table return, meta:%p, len:%d, msg:%s", LOG_ID_VALUE, meta, metaLen, tstrerror(code));
×
1711
  taosArrayDestroy(pArray);
×
1712
  if (pVgData) taosMemoryFreeClear(pVgData->pData);
×
1713
  taosMemoryFreeClear(pVgData);
×
1714
  destroyRequest(pRequest);
×
1715
  tDecoderClear(&dcoder);
×
1716
  qDestroyQuery(pQuery);
×
1717
  return code;
×
1718
}
1719

1720
int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD* fields,
×
1721
                                     int numFields) {
1722
  return taos_write_raw_block_with_fields_with_reqid(taos, rows, pData, tbname, fields, numFields, 0);
×
1723
}
1724

1725
int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname,
×
1726
                                                TAOS_FIELD* fields, int numFields, int64_t reqid) {
1727
  if (taos == NULL || pData == NULL || tbname == NULL) {
×
1728
    uError("invalid parameter in %s", __func__);
×
1729
    return TSDB_CODE_INVALID_PARA;
×
1730
  }
1731
  int32_t     code = TSDB_CODE_SUCCESS;
×
1732
  STableMeta* pTableMeta = NULL;
×
1733
  SQuery*     pQuery = NULL;
×
1734
  SHashObj*   pVgHash = NULL;
×
1735

1736
  SRequestObj* pRequest = NULL;
×
1737
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
×
1738

1739
  uDebug(LOG_ID_TAG " write raw block with field, rows:%d, pData:%p, tbname:%s, fields:%p, numFields:%d", LOG_ID_VALUE,
×
1740
         rows, pData, tbname, fields, numFields);
1741

1742
  pRequest->syncQuery = true;
×
1743
  if (!pRequest->pDb) {
×
1744
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1745
    goto end;
×
1746
  }
1747

1748
  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
×
1749
  tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
×
1750
  tstrncpy(pName.tname, tbname, sizeof(pName.tname));
×
1751

1752
  struct SCatalog* pCatalog = NULL;
×
1753
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
×
1754

1755
  SRequestConnInfo conn = {0};
×
1756
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
×
1757
  conn.requestId = pRequest->requestId;
×
1758
  conn.requestObjRefId = pRequest->self;
×
1759
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
×
1760

1761
  SVgroupInfo vgData = {0};
×
1762
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData));
×
1763
  RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
×
1764
  RAW_RETURN_CHECK(smlInitHandle(&pQuery));
×
1765
  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
×
1766
  RAW_NULL_CHECK(pVgHash);
×
1767
  RAW_RETURN_CHECK(
×
1768
      taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
1769
  RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0, false));
×
1770
  RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
×
1771

1772
  launchQueryImpl(pRequest, pQuery, true, NULL);
×
1773
  code = pRequest->code;
×
1774

1775
end:
×
1776
  uDebug(LOG_ID_TAG " write raw block with field return, msg:%s", LOG_ID_VALUE, tstrerror(code));
×
1777
  taosMemoryFreeClear(pTableMeta);
×
1778
  qDestroyQuery(pQuery);
×
1779
  destroyRequest(pRequest);
×
1780
  taosHashCleanup(pVgHash);
×
1781
  return code;
×
1782
}
1783

1784
int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) {
×
1785
  return taos_write_raw_block_with_reqid(taos, rows, pData, tbname, 0);
×
1786
}
1787

1788
int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname, int64_t reqid) {
×
1789
  if (taos == NULL || pData == NULL || tbname == NULL) {
×
1790
    return TSDB_CODE_INVALID_PARA;
×
1791
  }
1792
  int32_t     code = TSDB_CODE_SUCCESS;
×
1793
  STableMeta* pTableMeta = NULL;
×
1794
  SQuery*     pQuery = NULL;
×
1795
  SHashObj*   pVgHash = NULL;
×
1796

1797
  SRequestObj* pRequest = NULL;
×
1798
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
×
1799

1800
  uDebug(LOG_ID_TAG " write raw block, rows:%d, pData:%p, tbname:%s", LOG_ID_VALUE, rows, pData, tbname);
×
1801

1802
  pRequest->syncQuery = true;
×
1803
  if (!pRequest->pDb) {
×
1804
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1805
    goto end;
×
1806
  }
1807

1808
  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
×
1809
  tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
×
1810
  tstrncpy(pName.tname, tbname, sizeof(pName.tname));
×
1811

1812
  struct SCatalog* pCatalog = NULL;
×
1813
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
×
1814

1815
  SRequestConnInfo conn = {0};
×
1816
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
×
1817
  conn.requestId = pRequest->requestId;
×
1818
  conn.requestObjRefId = pRequest->self;
×
1819
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
×
1820

1821
  SVgroupInfo vgData = {0};
×
1822
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData));
×
1823
  RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
×
1824
  RAW_RETURN_CHECK(smlInitHandle(&pQuery));
×
1825
  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
×
1826
  RAW_NULL_CHECK(pVgHash);
×
1827
  RAW_RETURN_CHECK(
×
1828
      taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
1829
  RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false, NULL, 0, false));
×
1830
  RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
×
1831

1832
  launchQueryImpl(pRequest, pQuery, true, NULL);
×
1833
  code = pRequest->code;
×
1834

1835
end:
×
1836
  uDebug(LOG_ID_TAG " write raw block return, msg:%s", LOG_ID_VALUE, tstrerror(code));
×
1837
  taosMemoryFreeClear(pTableMeta);
×
1838
  qDestroyQuery(pQuery);
×
1839
  destroyRequest(pRequest);
×
1840
  taosHashCleanup(pVgHash);
×
1841
  return code;
×
1842
}
1843

1844
static void* getRawDataFromRes(void* pRetrieve) {
×
1845
  if (pRetrieve == NULL) {
×
1846
    uError("invalid parameter in %s", __func__);
×
1847
    return NULL;
×
1848
  }
1849
  void* rawData = NULL;
×
1850
  // deal with compatibility
1851
  if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_VERSION) {
×
1852
    rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
×
1853
  } else if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_VERSION ||
×
1854
             *(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION) {
×
1855
    rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
×
1856
  }
1857
  return rawData;
×
1858
}
1859

1860
static int32_t buildCreateTbMap(SMqDataRsp* rsp, SHashObj* pHashObj) {
×
1861
  if (rsp == NULL || pHashObj == NULL) {
×
1862
    uError("invalid parameter in %s", __func__);
×
1863
    return TSDB_CODE_INVALID_PARA;
×
1864
  }
1865
  // find schema data info
1866
  int32_t       code = 0;
×
1867
  SVCreateTbReq pCreateReq = {0};
×
1868
  SDecoder      decoderTmp = {0};
×
1869

1870
  for (int j = 0; j < rsp->createTableNum; j++) {
×
1871
    void** dataTmp = taosArrayGet(rsp->createTableReq, j);
×
1872
    RAW_NULL_CHECK(dataTmp);
×
1873
    int32_t* lenTmp = taosArrayGet(rsp->createTableLen, j);
×
1874
    RAW_NULL_CHECK(lenTmp);
×
1875

1876
    tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
×
1877
    RAW_RETURN_CHECK(tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq));
×
1878

1879
    if (pCreateReq.type != TSDB_CHILD_TABLE) {
×
1880
      code = TSDB_CODE_INVALID_MSG;
×
1881
      goto end;
×
1882
    }
1883
    if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL) {
×
1884
      RAW_RETURN_CHECK(
×
1885
          taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq)));
1886
    } else {
1887
      tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
×
1888
      pCreateReq = (SVCreateTbReq){0};
×
1889
    }
1890

1891
    tDecoderClear(&decoderTmp);
×
1892
  }
1893
  return 0;
×
1894

1895
end:
×
1896
  tDecoderClear(&decoderTmp);
×
1897
  tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
×
1898
  return code;
×
1899
}
1900

1901
typedef enum {
1902
  WRITE_RAW_INIT_START = 0,
1903
  WRITE_RAW_INIT_OK,
1904
  WRITE_RAW_INIT_FAIL,
1905
} WRITE_RAW_INIT_STATUS;
1906

1907
static SHashObj* writeRawCache = NULL;
1908
static int8_t    initFlag = 0;
1909
static int8_t    initedFlag = WRITE_RAW_INIT_START;
1910

1911
typedef struct {
1912
  SHashObj* pVgHash;
1913
  SHashObj* pNameHash;
1914
  SHashObj* pMetaHash;
1915
} rawCacheInfo;
1916

1917
typedef struct {
1918
  SVgroupInfo vgInfo;
1919
  int64_t     uid;
1920
  int64_t     suid;
1921
} tbInfo;
1922

1923
static void tmqFreeMeta(void* data) {
×
1924
  if (data == NULL) {
×
1925
    uError("invalid parameter in %s", __func__);
×
1926
    return;
×
1927
  }
1928
  STableMeta* pTableMeta = *(STableMeta**)data;
×
1929
  taosMemoryFree(pTableMeta);
×
1930
}
1931

1932
static void freeRawCache(void* data) {
×
1933
  if (data == NULL) {
×
1934
    uError("invalid parameter in %s", __func__);
×
1935
    return;
×
1936
  }
1937
  rawCacheInfo* pRawCache = (rawCacheInfo*)data;
×
1938
  taosHashCleanup(pRawCache->pMetaHash);
×
1939
  taosHashCleanup(pRawCache->pNameHash);
×
1940
  taosHashCleanup(pRawCache->pVgHash);
×
1941
}
1942

1943
static int32_t initRawCacheHash() {
×
1944
  if (writeRawCache == NULL) {
×
1945
    writeRawCache = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
1946
    if (writeRawCache == NULL) {
×
1947
      return terrno;
×
1948
    }
1949
    taosHashSetFreeFp(writeRawCache, freeRawCache);
×
1950
  }
1951
  return 0;
×
1952
}
1953

1954
static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW) {
×
1955
  if (rawData == NULL || pSW == NULL) {
×
1956
    return false;
×
1957
  }
1958
  if (pTableMeta == NULL) {
×
1959
    uError("invalid parameter in %s", __func__);
×
1960
    return false;
×
1961
  }
1962
  char* p = (char*)rawData;
×
1963
  // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
1964
  // column length |
1965
  p += sizeof(int32_t);
×
1966
  p += sizeof(int32_t);
×
1967
  p += sizeof(int32_t);
×
1968
  p += sizeof(int32_t);
×
1969
  p += sizeof(int32_t);
×
1970
  p += sizeof(uint64_t);
×
1971
  int8_t* fields = p;
×
1972

1973
  if (pSW->nCols != pTableMeta->tableInfo.numOfColumns) {
×
1974
    return true;
×
1975
  }
1976

1977
  for (int i = 0; i < pSW->nCols; i++) {
×
1978
    int j = 0;
×
1979
    for (; j < pTableMeta->tableInfo.numOfColumns; j++) {
×
1980
      SSchema*    pColSchema = &pTableMeta->schema[j];
×
1981
      SSchemaExt* pColExtSchema = &pTableMeta->schemaExt[j];
×
1982
      char*       fieldName = pSW->pSchema[i].name;
×
1983

1984
      if (strcmp(pColSchema->name, fieldName) == 0) {
×
1985
        if (checkSchema(pColSchema, pColExtSchema, fields, NULL, 0) != 0) {
×
1986
          return true;
×
1987
        }
1988
        break;
×
1989
      }
1990
    }
1991
    fields += sizeof(int8_t) + sizeof(int32_t);
×
1992

1993
    if (j == pTableMeta->tableInfo.numOfColumns) return true;
×
1994
  }
1995
  return false;
×
1996
}
1997

1998
static int32_t getRawCache(SHashObj** pVgHash, SHashObj** pNameHash, SHashObj** pMetaHash, void* key) {
×
1999
  if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || key == NULL) {
×
2000
    uError("invalid parameter in %s", __func__);
×
2001
    return TSDB_CODE_INVALID_PARA;
×
2002
  }
2003
  int32_t code = 0;
×
2004
  void*   cacheInfo = taosHashGet(writeRawCache, &key, POINTER_BYTES);
×
2005
  if (cacheInfo == NULL) {
×
2006
    *pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
×
2007
    RAW_NULL_CHECK(*pVgHash);
×
2008
    *pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
×
2009
    RAW_NULL_CHECK(*pNameHash);
×
2010
    *pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
×
2011
    RAW_NULL_CHECK(*pMetaHash);
×
2012
    taosHashSetFreeFp(*pMetaHash, tmqFreeMeta);
×
2013
    rawCacheInfo info = {*pVgHash, *pNameHash, *pMetaHash};
×
2014
    RAW_RETURN_CHECK(taosHashPut(writeRawCache, &key, POINTER_BYTES, &info, sizeof(rawCacheInfo)));
×
2015
  } else {
2016
    rawCacheInfo* info = (rawCacheInfo*)cacheInfo;
×
2017
    *pVgHash = info->pVgHash;
×
2018
    *pNameHash = info->pNameHash;
×
2019
    *pMetaHash = info->pMetaHash;
×
2020
  }
2021

2022
  return 0;
×
2023
end:
×
2024
  taosHashCleanup(*pMetaHash);
×
2025
  taosHashCleanup(*pNameHash);
×
2026
  taosHashCleanup(*pVgHash);
×
2027
  return code;
×
2028
}
2029

2030
static int32_t buildRawRequest(TAOS* taos, SRequestObj** pRequest, SCatalog** pCatalog, SRequestConnInfo* conn) {
×
2031
  if (taos == NULL || pRequest == NULL || pCatalog == NULL || conn == NULL) {
×
2032
    uError("invalid parameter in %s", __func__);
×
2033
    return TSDB_CODE_INVALID_PARA;
×
2034
  }
2035
  int32_t code = 0;
×
2036
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, pRequest, 0));
×
2037
  (*pRequest)->syncQuery = true;
×
2038
  if (!(*pRequest)->pDb) {
×
2039
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
2040
    goto end;
×
2041
  }
2042

2043
  RAW_RETURN_CHECK(catalogGetHandle((*pRequest)->pTscObj->pAppInfo->clusterId, pCatalog));
×
2044
  conn->pTrans = (*pRequest)->pTscObj->pAppInfo->pTransporter;
×
2045
  conn->requestId = (*pRequest)->requestId;
×
2046
  conn->requestObjRefId = (*pRequest)->self;
×
2047
  conn->mgmtEps = getEpSet_s(&(*pRequest)->pTscObj->pAppInfo->mgmtEp);
×
2048

2049
end:
×
2050
  return code;
×
2051
}
2052

2053
typedef int32_t _raw_decode_func_(SDecoder* pDecoder, SMqDataRsp* pRsp);
2054
static int32_t  decodeRawData(SDecoder* decoder, void* data, uint32_t dataLen, _raw_decode_func_ func,
×
2055
                              SMqRspObj* rspObj) {
2056
  if (decoder == NULL || data == NULL || func == NULL || rspObj == NULL) {
×
2057
    uError("invalid parameter in %s", __func__);
×
2058
    return TSDB_CODE_INVALID_PARA;
×
2059
  }
2060
  int8_t dataVersion = *(int8_t*)data;
×
2061
  if (dataVersion >= MQ_DATA_RSP_VERSION) {
×
2062
    data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
×
2063
    if (dataLen < sizeof(int8_t) + sizeof(int32_t)) {
×
2064
      return TSDB_CODE_INVALID_PARA;
×
2065
    }
2066
    dataLen -= sizeof(int8_t) + sizeof(int32_t);
×
2067
  }
2068

2069
  rspObj->resIter = -1;
×
2070
  tDecoderInit(decoder, data, dataLen);
×
2071
  int32_t code = func(decoder, &rspObj->dataRsp);
×
2072
  if (code != 0) {
×
2073
    SET_ERROR_MSG("decode mq taosx data rsp failed");
×
2074
  }
2075
  return code;
×
2076
}
2077

2078
static int32_t processCacheMeta(SHashObj* pVgHash, SHashObj* pNameHash, SHashObj* pMetaHash,
×
2079
                                SVCreateTbReq* pCreateReqDst, SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName,
2080
                                STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry) {
2081
  if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || pCatalog == NULL || conn == NULL || pName == NULL ||
×
2082
      pMeta == NULL) {
2083
    uError("invalid parameter in %s", __func__);
×
2084
    return TSDB_CODE_INVALID_PARA;
×
2085
  }
2086
  int32_t     code = 0;
×
2087
  STableMeta* pTableMeta = NULL;
×
2088
  tbInfo*     tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
×
2089
  if (tmpInfo == NULL || retry > 0) {
×
2090
    tbInfo info = {0};
×
2091

2092
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, conn, pName, &info.vgInfo));
×
2093
    if (pCreateReqDst && tmpInfo == NULL) {  // change stable name to get meta
×
2094
      tstrncpy(pName->tname, pCreateReqDst->ctb.stbName, TSDB_TABLE_NAME_LEN);
×
2095
    }
2096
    RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
×
2097
    info.uid = pTableMeta->uid;
×
2098
    if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
×
2099
      info.suid = pTableMeta->suid;
×
2100
    } else {
2101
      info.suid = pTableMeta->uid;
×
2102
    }
2103
    code = taosHashPut(pMetaHash, &info.suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
×
2104
    if (code != 0) {
×
2105
      taosMemoryFree(pTableMeta);
×
2106
      goto end;
×
2107
    }
2108
    uDebug("put table meta to hash1, suid:%" PRId64 ", metaHashSIze:%d, nameHashSize:%d, vgHashSize:%d", info.suid, taosHashGetSize(pMetaHash),
×
2109
           taosHashGetSize(pNameHash), taosHashGetSize(pVgHash));
2110
    if (pCreateReqDst) {
×
2111
      pTableMeta->vgId = info.vgInfo.vgId;
×
2112
      pTableMeta->uid = pCreateReqDst->uid;
×
2113
      pCreateReqDst->ctb.suid = pTableMeta->suid;
×
2114
    }
2115

2116
    RAW_RETURN_CHECK(taosHashPut(pNameHash, pName->tname, strlen(pName->tname), &info, sizeof(tbInfo)));
×
2117
    tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
×
2118
    RAW_RETURN_CHECK(
×
2119
        taosHashPut(pVgHash, &info.vgInfo.vgId, sizeof(info.vgInfo.vgId), &info.vgInfo, sizeof(SVgroupInfo)));
2120
  }
2121

2122
  if (pTableMeta == NULL || retry > 0) {
×
2123
    STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, &tmpInfo->suid, LONG_BYTES);
×
2124
    if (pTableMetaTmp == NULL || retry > 0 || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) {
×
2125
      RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
×
2126
      code = taosHashPut(pMetaHash, &tmpInfo->suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
×
2127
      if (code != 0) {
×
2128
        taosMemoryFree(pTableMeta);
×
2129
        goto end;
×
2130
      }
2131
      uDebug("put table meta to hash2, suid:%" PRId64 ", metaHashSIze:%d, nameHashSize:%d, vgHashSize:%d", tmpInfo->suid, taosHashGetSize(pMetaHash),
×
2132
      taosHashGetSize(pNameHash), taosHashGetSize(pVgHash));
2133
    } else {
2134
      pTableMeta = *pTableMetaTmp;
×
2135
      pTableMeta->uid = tmpInfo->uid;
×
2136
      pTableMeta->vgId = tmpInfo->vgInfo.vgId;
×
2137
    }
2138
  }
2139
  *pMeta = pTableMeta;
×
2140

2141
end:
×
2142
  return code;
×
2143
}
2144

2145
static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
×
2146
  if (taos == NULL || data == NULL) {
×
2147
    uError("invalid parameter in %s", __func__);
×
2148
    return TSDB_CODE_INVALID_PARA;
×
2149
  }
2150
  int32_t   code = TSDB_CODE_SUCCESS;
×
2151
  SQuery*   pQuery = NULL;
×
2152
  SMqRspObj rspObj = {0};
×
2153
  SDecoder  decoder = {0};
×
2154

2155
  SRequestObj*     pRequest = NULL;
×
2156
  SCatalog*        pCatalog = NULL;
×
2157
  SRequestConnInfo conn = {0};
×
2158
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
×
2159
  uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
×
2160
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
×
2161

2162
  SHashObj* pVgHash = NULL;
×
2163
  SHashObj* pNameHash = NULL;
×
2164
  SHashObj* pMetaHash = NULL;
×
2165
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
×
2166
  int retry = 0;
×
2167
  while (1) {
2168
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
×
2169
    uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
×
2170
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
×
2171
      if (!rspObj.dataRsp.withSchema) {
×
2172
        goto end;
×
2173
      }
2174

2175
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
×
2176
      RAW_NULL_CHECK(tbName);
×
2177
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
×
2178
      RAW_NULL_CHECK(pSW);
×
2179
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
×
2180
      RAW_NULL_CHECK(pRetrieve);
×
2181
      void* rawData = getRawDataFromRes(pRetrieve);
×
2182
      RAW_NULL_CHECK(rawData);
×
2183

2184
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
×
2185
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
×
2186
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
×
2187
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
×
2188

2189
      STableMeta* pTableMeta = NULL;
×
2190
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, &pTableMeta, pSW,
×
2191
                                        rawData, retry));
2192
      char err[ERR_MSG_LEN] = {0};
×
2193
      code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
×
2194
      if (code != TSDB_CODE_SUCCESS) {
×
2195
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
2196
        goto end;
×
2197
      }
2198
    }
2199
    RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
×
2200
    launchQueryImpl(pRequest, pQuery, true, NULL);
×
2201
    code = pRequest->code;
×
2202

2203
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
×
2204
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
2205
      qDestroyQuery(pQuery);
×
2206
      pQuery = NULL;
×
2207
      rspObj.resIter = -1;
×
2208
      continue;
×
2209
    }
2210
    break;
×
2211
  }
2212

2213
end:
×
2214
  uDebug(LOG_ID_TAG " write raw data return, msg:%s", LOG_ID_VALUE, tstrerror(code));
×
2215
  tDeleteMqDataRsp(&rspObj.dataRsp);
×
2216
  tDecoderClear(&decoder);
×
2217
  qDestroyQuery(pQuery);
×
2218
  destroyRequest(pRequest);
×
2219
  return code;
×
2220
}
2221

2222
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
×
2223
  if (taos == NULL || data == NULL) {
×
2224
    uError("invalid parameter in %s", __func__);
×
2225
    return TSDB_CODE_INVALID_PARA;
×
2226
  }
2227
  int32_t   code = TSDB_CODE_SUCCESS;
×
2228
  SQuery*   pQuery = NULL;
×
2229
  SMqRspObj rspObj = {0};
×
2230
  SDecoder  decoder = {0};
×
2231
  SHashObj* pCreateTbHash = NULL;
×
2232

2233
  SRequestObj*     pRequest = NULL;
×
2234
  SCatalog*        pCatalog = NULL;
×
2235
  SRequestConnInfo conn = {0};
×
2236

2237
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
×
2238
  uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
×
2239
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeSTaosxRsp, &rspObj));
×
2240

2241
  pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
×
2242
  RAW_NULL_CHECK(pCreateTbHash);
×
2243
  RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash));
×
2244

2245
  SHashObj* pVgHash = NULL;
×
2246
  SHashObj* pNameHash = NULL;
×
2247
  SHashObj* pMetaHash = NULL;
×
2248
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
×
2249
  int retry = 0;
×
2250
  while (1) {
2251
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
×
2252
    uDebug(LOG_ID_TAG " write raw meta data block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
×
2253
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
×
2254
      if (!rspObj.dataRsp.withSchema) {
×
2255
        goto end;
×
2256
      }
2257

2258
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
×
2259
      RAW_NULL_CHECK(tbName);
×
2260
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
×
2261
      RAW_NULL_CHECK(pSW);
×
2262
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
×
2263
      RAW_NULL_CHECK(pRetrieve);
×
2264
      void* rawData = getRawDataFromRes(pRetrieve);
×
2265
      RAW_NULL_CHECK(rawData);
×
2266

2267
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
×
2268
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
×
2269
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
×
2270
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
×
2271

2272
      // find schema data info
2273
      SVCreateTbReq* pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, pName.tname, strlen(pName.tname));
×
2274
      STableMeta*    pTableMeta = NULL;
×
2275
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, pCreateReqDst, pCatalog, &conn, &pName,
×
2276
                                        &pTableMeta, pSW, rawData, retry));
2277
      char err[ERR_MSG_LEN] = {0};
×
2278
      code =
2279
          rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
×
2280
      if (code != TSDB_CODE_SUCCESS) {
×
2281
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
2282
        goto end;
×
2283
      }
2284
    }
2285
    RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
×
2286
    launchQueryImpl(pRequest, pQuery, true, NULL);
×
2287
    code = pRequest->code;
×
2288

2289
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
×
2290
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
2291
      qDestroyQuery(pQuery);
×
2292
      pQuery = NULL;
×
2293
      rspObj.resIter = -1;
×
2294
      continue;
×
2295
    }
2296
    break;
×
2297
  }
2298

2299
end:
×
2300
  uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
×
2301
  tDeleteSTaosxRsp(&rspObj.dataRsp);
×
2302
  void* pIter = taosHashIterate(pCreateTbHash, NULL);
×
2303
  while (pIter) {
×
2304
    tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE);
×
2305
    pIter = taosHashIterate(pCreateTbHash, pIter);
×
2306
  }
2307
  taosHashCleanup(pCreateTbHash);
×
2308
  tDecoderClear(&decoder);
×
2309
  qDestroyQuery(pQuery);
×
2310
  destroyRequest(pRequest);
×
2311
  return code;
×
2312
}
2313

2314
static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
×
2315
  if (taos == NULL || data == NULL) {
×
2316
    uError("invalid parameter in %s", __func__);
×
2317
    return TSDB_CODE_INVALID_PARA;
×
2318
  }
2319
  int32_t   code = TSDB_CODE_SUCCESS;
×
2320
  SQuery*   pQuery = NULL;
×
2321
  SHashObj* pVgroupHash = NULL;
×
2322
  SMqRspObj rspObj = {0};
×
2323
  SDecoder  decoder = {0};
×
2324

2325
  SRequestObj*     pRequest = NULL;
×
2326
  SCatalog*        pCatalog = NULL;
×
2327
  SRequestConnInfo conn = {0};
×
2328

2329
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
×
2330
  uDebug(LOG_ID_TAG " write raw rawdata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
×
2331
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
×
2332

2333
  SHashObj* pVgHash = NULL;
×
2334
  SHashObj* pNameHash = NULL;
×
2335
  SHashObj* pMetaHash = NULL;
×
2336
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
×
2337
  int retry = 0;
×
2338
  while (1) {
×
2339
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
×
2340
    uDebug(LOG_ID_TAG " write raw rawdata block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
×
2341
    SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(pQuery)->pRoot;
×
2342
    pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
×
2343
    RAW_NULL_CHECK(pVgroupHash);
×
2344
    pStmt->pVgDataBlocks = taosArrayInit(8, POINTER_BYTES);
×
2345
    RAW_NULL_CHECK(pStmt->pVgDataBlocks);
×
2346

2347
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
×
2348
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
×
2349
      RAW_NULL_CHECK(tbName);
×
2350
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
×
2351
      RAW_NULL_CHECK(pRetrieve);
×
2352
      void* rawData = getRawDataFromRes(pRetrieve);
×
2353
      RAW_NULL_CHECK(rawData);
×
2354

2355
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
×
2356
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
×
2357
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
×
2358
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
×
2359

2360
      // find schema data info
2361
      STableMeta* pTableMeta = NULL;
×
2362
      RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, &pTableMeta, NULL,
×
2363
                                        NULL, retry));
2364
      char err[ERR_MSG_LEN] = {0};
×
2365
      code = rawBlockBindRawData(pVgroupHash, pStmt->pVgDataBlocks, pTableMeta, rawData);
×
2366
      if (code != TSDB_CODE_SUCCESS) {
×
2367
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
2368
        goto end;
×
2369
      }
2370
    }
2371
    taosHashCleanup(pVgroupHash);
×
2372
    pVgroupHash = NULL;
×
2373

2374
    RAW_RETURN_CHECK(smlBuildOutputRaw(pQuery, pVgHash));
×
2375
    launchQueryImpl(pRequest, pQuery, true, NULL);
×
2376
    code = pRequest->code;
×
2377

2378
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
×
2379
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
2380
      qDestroyQuery(pQuery);
×
2381
      pQuery = NULL;
×
2382
      rspObj.resIter = -1;
×
2383
      continue;
×
2384
    }
2385
    break;
×
2386
  }
2387

2388
end:
×
2389
  uDebug(LOG_ID_TAG " write raw rawdata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
×
2390
  tDeleteMqDataRsp(&rspObj.dataRsp);
×
2391
  tDecoderClear(&decoder);
×
2392
  qDestroyQuery(pQuery);
×
2393
  taosHashCleanup(pVgroupHash);
×
2394
  destroyRequest(pRequest);
×
2395
  return code;
×
2396
}
2397

2398
static void processSimpleMeta(SMqMetaRsp* pMetaRsp, cJSON** meta) {
×
2399
  if (pMetaRsp == NULL || meta == NULL) {
×
2400
    uError("invalid parameter in %s", __func__);
×
2401
    return;
×
2402
  }
2403
  if (pMetaRsp->resMsgType == TDMT_VND_CREATE_STB) {
×
2404
    processCreateStb(pMetaRsp, meta);
×
2405
  } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_STB) {
×
2406
    processAlterStb(pMetaRsp, meta);
×
2407
  } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_STB) {
×
2408
    processDropSTable(pMetaRsp, meta);
×
2409
  } else if (pMetaRsp->resMsgType == TDMT_VND_CREATE_TABLE) {
×
2410
    processCreateTable(pMetaRsp, meta);
×
2411
  } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_TABLE) {
×
2412
    processAlterTable(pMetaRsp, meta);
×
2413
  } else if (pMetaRsp->resMsgType == TDMT_VND_DROP_TABLE) {
×
2414
    processDropTable(pMetaRsp, meta);
×
2415
  } else if (pMetaRsp->resMsgType == TDMT_VND_DELETE) {
×
2416
    processDeleteTable(pMetaRsp, meta);
×
2417
  }
2418
}
2419

2420
static void processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp, char** string) {
×
2421
  if (pMsgRsp == NULL || string == NULL) {
×
2422
    uError("invalid parameter in %s", __func__);
×
2423
    return;
×
2424
  }
2425
  SDecoder        coder = {0};
×
2426
  SMqBatchMetaRsp rsp = {0};
×
2427
  int32_t         code = 0;
×
2428
  cJSON*          pJson = NULL;
×
2429
  tDecoderInit(&coder, pMsgRsp->pMetaBuff, pMsgRsp->metaBuffLen);
×
2430
  if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) {
×
2431
    goto end;
×
2432
  }
2433

2434
  pJson = cJSON_CreateObject();
×
2435
  RAW_NULL_CHECK(pJson);
×
2436
  RAW_FALSE_CHECK(cJSON_AddStringToObject(pJson, "tmq_meta_version", TMQ_META_VERSION));
×
2437
  cJSON* pMetaArr = cJSON_CreateArray();
×
2438
  RAW_NULL_CHECK(pMetaArr);
×
2439
  RAW_FALSE_CHECK(tmqAddJsonObjectItem(pJson, "metas", pMetaArr));
×
2440

2441
  int32_t num = taosArrayGetSize(rsp.batchMetaReq);
×
2442
  for (int32_t i = 0; i < num; i++) {
×
2443
    int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
×
2444
    RAW_NULL_CHECK(len);
×
2445
    void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
×
2446
    RAW_NULL_CHECK(tmpBuf);
×
2447
    SDecoder   metaCoder = {0};
×
2448
    SMqMetaRsp metaRsp = {0};
×
2449
    tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead));
×
2450
    if (tDecodeMqMetaRsp(&metaCoder, &metaRsp) < 0) {
×
2451
      goto end;
×
2452
    }
2453
    cJSON* pItem = NULL;
×
2454
    processSimpleMeta(&metaRsp, &pItem);
×
2455
    tDeleteMqMetaRsp(&metaRsp);
×
2456
    RAW_FALSE_CHECK(tmqAddJsonArrayItem(pMetaArr, pItem));
×
2457
  }
2458

2459
  tDeleteMqBatchMetaRsp(&rsp);
×
2460
  char* fullStr = cJSON_PrintUnformatted(pJson);
×
2461
  cJSON_Delete(pJson);
×
2462
  *string = fullStr;
×
2463
  return;
×
2464

2465
end:
×
2466
  cJSON_Delete(pJson);
×
2467
  tDeleteMqBatchMetaRsp(&rsp);
×
2468
}
2469

2470
char* tmq_get_json_meta(TAOS_RES* res) {
×
2471
  if (res == NULL) {
×
2472
    uError("invalid parameter in %s", __func__);
×
2473
    return NULL;
×
2474
  }
2475
  uDebug("tmq_get_json_meta res:%p", res);
×
2476
  if (!TD_RES_TMQ_META(res) && !TD_RES_TMQ_METADATA(res) && !TD_RES_TMQ_BATCH_META(res)) {
×
2477
    return NULL;
×
2478
  }
2479

2480
  char*      string = NULL;
×
2481
  SMqRspObj* rspObj = (SMqRspObj*)res;
×
2482
  if (TD_RES_TMQ_METADATA(res)) {
×
2483
    processAutoCreateTable(&rspObj->dataRsp, &string);
×
2484
  } else if (TD_RES_TMQ_BATCH_META(res)) {
×
2485
    processBatchMetaToJson(&rspObj->batchMetaRsp, &string);
×
2486
  } else if (TD_RES_TMQ_META(res)) {
×
2487
    cJSON* pJson = NULL;
×
2488
    processSimpleMeta(&rspObj->metaRsp, &pJson);
×
2489
    string = cJSON_PrintUnformatted(pJson);
×
2490
    cJSON_Delete(pJson);
×
2491
  } else {
2492
    uError("tmq_get_json_meta res:%d, invalid type", *(int8_t*)res);
×
2493
  }
2494

2495
  uDebug("tmq_get_json_meta string:%s", string);
×
2496
  return string;
×
2497
}
2498

2499
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
×
2500

2501
static int32_t getOffSetLen(const SMqDataRsp* pRsp) {
×
2502
  if (pRsp == NULL) {
×
2503
    uError("invalid parameter in %s", __func__);
×
2504
    return TSDB_CODE_INVALID_PARA;
×
2505
  }
2506
  SEncoder coder = {0};
×
2507
  tEncoderInit(&coder, NULL, 0);
×
2508
  if (tEncodeSTqOffsetVal(&coder, &pRsp->reqOffset) < 0) return -1;
×
2509
  if (tEncodeSTqOffsetVal(&coder, &pRsp->rspOffset) < 0) return -1;
×
2510
  int32_t pos = coder.pos;
×
2511
  tEncoderClear(&coder);
×
2512
  return pos;
×
2513
}
2514

2515
typedef int32_t __encode_func__(SEncoder* pEncoder, const SMqDataRsp* pRsp);
2516
static int32_t  encodeMqDataRsp(__encode_func__* encodeFunc, SMqDataRsp* rspObj, tmq_raw_data* raw) {
×
2517
  if (raw == NULL || encodeFunc == NULL || rspObj == NULL) {
×
2518
    uError("invalid parameter in %s", __func__);
×
2519
    return TSDB_CODE_INVALID_PARA;
×
2520
  }
2521
  uint32_t len = 0;
×
2522
  int32_t  code = 0;
×
2523
  SEncoder encoder = {0};
×
2524
  void*    buf = NULL;
×
2525
  tEncodeSize(encodeFunc, rspObj, len, code);
×
2526
  if (code < 0) {
×
2527
    code = TSDB_CODE_INVALID_MSG;
×
2528
    goto FAILED;
×
2529
  }
2530
  len += sizeof(int8_t) + sizeof(int32_t);
×
2531
  buf = taosMemoryCalloc(1, len);
×
2532
  if (buf == NULL) {
×
2533
    code = terrno;
×
2534
    goto FAILED;
×
2535
  }
2536
  tEncoderInit(&encoder, buf, len);
×
2537
  if (tEncodeI8(&encoder, MQ_DATA_RSP_VERSION) < 0) {
×
2538
    code = TSDB_CODE_INVALID_MSG;
×
2539
    goto FAILED;
×
2540
  }
2541
  int32_t offsetLen = getOffSetLen(rspObj);
×
2542
  if (offsetLen <= 0) {
×
2543
    code = TSDB_CODE_INVALID_MSG;
×
2544
    goto FAILED;
×
2545
  }
2546
  if (tEncodeI32(&encoder, offsetLen) < 0) {
×
2547
    code = TSDB_CODE_INVALID_MSG;
×
2548
    goto FAILED;
×
2549
  }
2550
  if (encodeFunc(&encoder, rspObj) < 0) {
×
2551
    code = TSDB_CODE_INVALID_MSG;
×
2552
    goto FAILED;
×
2553
  }
2554
  tEncoderClear(&encoder);
×
2555

2556
  raw->raw = buf;
×
2557
  raw->raw_len = len;
×
2558
  return code;
×
2559
FAILED:
×
2560
  tEncoderClear(&encoder);
×
2561
  taosMemoryFree(buf);
×
2562
  return code;
×
2563
}
2564

2565
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
×
2566
  if (raw == NULL || res == NULL) {
×
2567
    uError("invalid parameter in %s", __func__);
×
2568
    return TSDB_CODE_INVALID_PARA;
×
2569
  }
2570
  *raw = (tmq_raw_data){0};
×
2571
  SMqRspObj* rspObj = ((SMqRspObj*)res);
×
2572
  if (TD_RES_TMQ_META(res)) {
×
2573
    raw->raw = rspObj->metaRsp.metaRsp;
×
2574
    raw->raw_len = rspObj->metaRsp.metaRspLen >= 0 ? rspObj->metaRsp.metaRspLen : 0;
×
2575
    raw->raw_type = rspObj->metaRsp.resMsgType;
×
2576
    uDebug("tmq get raw type meta:%p", raw);
×
2577
  } else if (TD_RES_TMQ(res)) {
×
2578
    int32_t code = encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->dataRsp, raw);
×
2579
    if (code != 0) {
×
2580
      uError("tmq get raw type error:%d", terrno);
×
2581
      return code;
×
2582
    }
2583
    raw->raw_type = RES_TYPE__TMQ;
×
2584
    uDebug("tmq get raw type data:%p", raw);
×
2585
  } else if (TD_RES_TMQ_METADATA(res)) {
×
2586
    int32_t code = encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->dataRsp, raw);
×
2587
    if (code != 0) {
×
2588
      uError("tmq get raw type error:%d", terrno);
×
2589
      return code;
×
2590
    }
2591
    raw->raw_type = RES_TYPE__TMQ_METADATA;
×
2592
    uDebug("tmq get raw type metadata:%p", raw);
×
2593
  } else if (TD_RES_TMQ_BATCH_META(res)) {
×
2594
    raw->raw = rspObj->batchMetaRsp.pMetaBuff;
×
2595
    raw->raw_len = rspObj->batchMetaRsp.metaBuffLen;
×
2596
    raw->raw_type = rspObj->resType;
×
2597
    uDebug("tmq get raw batch meta:%p", raw);
×
2598
  } else if (TD_RES_TMQ_RAW(res)) {
×
2599
    raw->raw = rspObj->dataRsp.rawData;
×
2600
    rspObj->dataRsp.rawData = NULL;
×
2601
    raw->raw_len = rspObj->dataRsp.len;
×
2602
    raw->raw_type = rspObj->resType;
×
2603
    uDebug("tmq get raw raw:%p", raw);
×
2604
  } else {
2605
    uError("tmq get raw error type:%d", *(int8_t*)res);
×
2606
    return TSDB_CODE_TMQ_INVALID_MSG;
×
2607
  }
2608
  return TSDB_CODE_SUCCESS;
×
2609
}
2610

2611
void tmq_free_raw(tmq_raw_data raw) {
×
2612
  uDebug("tmq free raw data type:%d", raw.raw_type);
×
2613
  if (raw.raw_type == RES_TYPE__TMQ || raw.raw_type == RES_TYPE__TMQ_METADATA) {
×
2614
    taosMemoryFree(raw.raw);
×
2615
  } else if (raw.raw_type == RES_TYPE__TMQ_RAWDATA && raw.raw != NULL) {
×
2616
    taosMemoryFree(POINTER_SHIFT(raw.raw, -sizeof(SMqRspHead)));
×
2617
  }
2618
  (void)memset(terrMsg, 0, ERR_MSG_LEN);
×
2619
}
×
2620

2621
static int32_t writeRawInit() {
×
2622
  while (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_START) {
×
2623
    int8_t old = atomic_val_compare_exchange_8(&initFlag, 0, 1);
×
2624
    if (old == 0) {
×
2625
      int32_t code = initRawCacheHash();
×
2626
      if (code != 0) {
×
2627
        uError("tmq writeRawImpl init error:%d", code);
×
2628
        atomic_store_8(&initedFlag, WRITE_RAW_INIT_FAIL);
×
2629
        return code;
×
2630
      }
2631
      atomic_store_8(&initedFlag, WRITE_RAW_INIT_OK);
×
2632
    }
2633
  }
2634

2635
  if (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_FAIL) {
×
2636
    return TSDB_CODE_INTERNAL_ERROR;
×
2637
  }
2638
  return 0;
×
2639
}
2640

2641
static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) {
×
2642
  if (taos == NULL || buf == NULL) {
×
2643
    uError("invalid parameter in %s", __func__);
×
2644
    return TSDB_CODE_INVALID_PARA;
×
2645
  }
2646
  if (writeRawInit() != 0) {
×
2647
    return TSDB_CODE_INTERNAL_ERROR;
×
2648
  }
2649

2650
  if (type == TDMT_VND_CREATE_STB) {
×
2651
    return taosCreateStb(taos, buf, len);
×
2652
  } else if (type == TDMT_VND_ALTER_STB) {
×
2653
    return taosCreateStb(taos, buf, len);
×
2654
  } else if (type == TDMT_VND_DROP_STB) {
×
2655
    return taosDropStb(taos, buf, len);
×
2656
  } else if (type == TDMT_VND_CREATE_TABLE) {
×
2657
    return taosCreateTable(taos, buf, len);
×
2658
  } else if (type == TDMT_VND_ALTER_TABLE) {
×
2659
    return taosAlterTable(taos, buf, len);
×
2660
  } else if (type == TDMT_VND_DROP_TABLE) {
×
2661
    return taosDropTable(taos, buf, len);
×
2662
  } else if (type == TDMT_VND_DELETE) {
×
2663
    return taosDeleteData(taos, buf, len);
×
2664
  } else if (type == RES_TYPE__TMQ_METADATA) {
×
2665
    return tmqWriteRawMetaDataImpl(taos, buf, len);
×
2666
  } else if (type == RES_TYPE__TMQ_RAWDATA) {
×
2667
    return tmqWriteRawRawDataImpl(taos, buf, len);
×
2668
  } else if (type == RES_TYPE__TMQ) {
×
2669
    return tmqWriteRawDataImpl(taos, buf, len);
×
2670
  } else if (type == RES_TYPE__TMQ_BATCH_META) {
×
2671
    return tmqWriteBatchMetaDataImpl(taos, buf, len);
×
2672
  }
2673
  return TSDB_CODE_INVALID_PARA;
×
2674
}
2675

2676
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
×
2677
  if (taos == NULL || raw.raw == NULL || raw.raw_len <= 0) {
×
2678
    SET_ERROR_MSG("taos:%p or data:%p is NULL or raw_len <= 0", taos, raw.raw);
×
2679
    return TSDB_CODE_INVALID_PARA;
×
2680
  }
2681
  taosClearErrMsg();  // clear global error message
×
2682

2683
  return writeRawImpl(taos, raw.raw, raw.raw_len, raw.raw_type);
×
2684
}
2685

2686
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen) {
×
2687
  if (taos == NULL || meta == NULL) {
×
2688
    uError("invalid parameter in %s", __func__);
×
2689
    return TSDB_CODE_INVALID_PARA;
×
2690
  }
2691
  SMqBatchMetaRsp rsp = {0};
×
2692
  SDecoder        coder = {0};
×
2693
  int32_t         code = TSDB_CODE_SUCCESS;
×
2694

2695
  // decode and process req
2696
  tDecoderInit(&coder, meta, metaLen);
×
2697
  if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) {
×
2698
    code = TSDB_CODE_INVALID_PARA;
×
2699
    goto end;
×
2700
  }
2701
  int32_t num = taosArrayGetSize(rsp.batchMetaReq);
×
2702
  for (int32_t i = 0; i < num; i++) {
×
2703
    int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
×
2704
    RAW_NULL_CHECK(len);
×
2705
    void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
×
2706
    RAW_NULL_CHECK(tmpBuf);
×
2707
    SDecoder   metaCoder = {0};
×
2708
    SMqMetaRsp metaRsp = {0};
×
2709
    tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead));
×
2710
    if (tDecodeMqMetaRsp(&metaCoder, &metaRsp) < 0) {
×
2711
      code = TSDB_CODE_INVALID_PARA;
×
2712
      goto end;
×
2713
    }
2714
    code = writeRawImpl(taos, metaRsp.metaRsp, metaRsp.metaRspLen, metaRsp.resMsgType);
×
2715
    tDeleteMqMetaRsp(&metaRsp);
×
2716
    if (code != TSDB_CODE_SUCCESS) {
×
2717
      goto end;
×
2718
    }
2719
  }
2720

2721
end:
×
2722
  tDeleteMqBatchMetaRsp(&rsp);
×
2723
  return code;
×
2724
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc