• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5035

24 Apr 2026 11:25AM UTC coverage: 73.06% (+0.002%) from 73.058%
#5035

push

travis-ci

web-flow
merge: from main to 3.0 branch #35224

merge: from main to 3.0 branch[manual-only]

1344 of 1975 new or added lines in 48 files covered. (68.05%)

14127 existing lines in 142 files now uncovered.

275902 of 377640 relevant lines covered (73.06%)

132208813.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.22
/source/client/src/clientRawBlockWrite.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include <string.h>
17
#include "cJSON.h"
18
#include "clientInt.h"
19
#include "clientRawBlockWrite.h"
20
#include "nodes.h"
21
#include "osMemPool.h"
22
#include "osMemory.h"
23
#include "parser.h"
24
#include "taosdef.h"
25
#include "tarray.h"
26
#include "tbase64.h"
27
#include "tcol.h"
28
#include "tcompression.h"
29
#include "tdatablock.h"
30
#include "tdataformat.h"
31
#include "tdef.h"
32
#include "tglobal.h"
33
#include "tmsgtype.h"
34

35
static int32_t  tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen);
36
static tb_uid_t processSuid(tb_uid_t suid, char* db) {
40,819✔
37
  if (db == NULL) {
40,819✔
38
    return suid;
×
39
  }
40
  return suid + MurmurHash3_32(db, strlen(db));
40,819✔
41
}
42

43
static int32_t taosCreateStb(TAOS* taos, void* meta, uint32_t metaLen) {
40,819✔
44
  if (taos == NULL || meta == NULL) {
40,819✔
45
    uError("invalid parameter in %s", __func__);
×
46
    return TSDB_CODE_INVALID_PARA;
×
47
  }
48
  SVCreateStbReq req = {0};
40,819✔
49
  SDecoder       coder = {0};
40,819✔
50
  SMCreateStbReq pReq = {0};
40,819✔
51
  int32_t        code = TSDB_CODE_SUCCESS;
40,819✔
52
  int32_t        lino = 0;
40,819✔
53
  SRequestObj*   pRequest = NULL;
40,819✔
54
  SCmdMsgInfo    pCmdMsg = {0};
40,819✔
55
  RAW_LOG_START
40,819✔
56

57
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
40,819✔
58
  uDebug(LOG_ID_TAG " create stable, meta:%p, metaLen:%d, db:%s", LOG_ID_VALUE, meta, metaLen,
40,819✔
59
         pRequest->pDb ? pRequest->pDb : "NULL");
60
  pRequest->syncQuery = true;
40,819✔
61
  if (!pRequest->pDb) {
40,819✔
62
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
63
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
64
    goto end;
×
65
  }
66
  // decode and process req
67
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
40,819✔
68
  uint32_t len = metaLen - sizeof(SMsgHead);
40,819✔
69
  tDecoderInit(&coder, data, len);
40,819✔
70
  RAW_RETURN_CHECK(tDecodeSVCreateStbReq(&coder, &req));
40,819✔
71

72
  int8_t           createDefaultCompress = 0;
40,819✔
73
  SColCmprWrapper* p = &req.colCmpr;
40,819✔
74
  if (p->nCols == 0) {
40,819✔
75
    createDefaultCompress = 1;
×
76
  }
77
  // build create stable
78
  pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SFieldWithOptions));
40,819✔
79
  RAW_NULL_CHECK(pReq.pColumns);
40,819✔
80
  for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
256,383✔
81
    SSchema*          pSchema = req.schemaRow.pSchema + i;
215,564✔
82
    SFieldWithOptions field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
215,564✔
83
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
215,564✔
84

85
    if (createDefaultCompress) {
215,564✔
86
      field.compress = createDefaultColCmprByType(pSchema->type);
×
87
    } else {
88
      SColCmpr* pCmp = &req.colCmpr.pColCmpr[i];
215,564✔
89
      field.compress = pCmp->alg;
215,564✔
90
    }
91
    if (req.pExtSchemas) field.typeMod = req.pExtSchemas[i].typeMod;
215,564✔
92
    RAW_NULL_CHECK(taosArrayPush(pReq.pColumns, &field));
431,128✔
93
  }
94
  pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
40,819✔
95
  RAW_NULL_CHECK(pReq.pTags);
40,819✔
96
  for (int32_t i = 0; i < req.schemaTag.nCols; i++) {
148,239✔
97
    SSchema* pSchema = req.schemaTag.pSchema + i;
107,420✔
98
    SField   field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
107,420✔
99
    tstrncpy(field.name, pSchema->name, TSDB_COL_NAME_LEN);
107,420✔
100
    RAW_NULL_CHECK(taosArrayPush(pReq.pTags, &field));
214,840✔
101
  }
102

103
  pReq.colVer = req.schemaRow.version;
40,819✔
104
  pReq.tagVer = req.schemaTag.version;
40,819✔
105
  pReq.numOfColumns = req.schemaRow.nCols;
40,819✔
106
  pReq.numOfTags = req.schemaTag.nCols;
40,819✔
107
  pReq.commentLen = -1;
40,819✔
108
  pReq.suid = processSuid(req.suid, pRequest->pDb);
40,819✔
109
  pReq.source = TD_REQ_FROM_TAOX;
40,819✔
110
  pReq.igExists = true;
40,819✔
111
  pReq.virtualStb = req.virtualStb;
40,819✔
112
  pReq.securityLevel = req.securityLevel;  // Preserve source cluster's security classification
40,819✔
113

114
  uDebug(LOG_ID_TAG " create stable name:%s suid:%" PRId64 " processSuid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
40,819✔
115
         pReq.suid);
116
  STscObj* pTscObj = pRequest->pTscObj;
40,819✔
117
  SName    tableName = {0};
40,819✔
118
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
40,819✔
119
  RAW_RETURN_CHECK(tNameExtractFullName(&tableName, pReq.name));
40,819✔
120
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
40,819✔
121
  pCmdMsg.msgType = TDMT_MND_CREATE_STB;
40,819✔
122
  pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
40,819✔
123
  RAW_FALSE_CHECK(pCmdMsg.msgLen > 0);
40,819✔
124
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
40,819✔
125
  RAW_NULL_CHECK(pCmdMsg.pMsg);
40,819✔
126
  RAW_FALSE_CHECK(tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) > 0);
40,819✔
127

128
  SQuery pQuery = {0};
40,819✔
129
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
40,819✔
130
  pQuery.pCmdMsg = &pCmdMsg;
40,819✔
131
  pQuery.msgType = pQuery.pCmdMsg->msgType;
40,819✔
132
  pQuery.stableQuery = true;
40,819✔
133

134
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
40,819✔
135

136
  if (pRequest->code == TSDB_CODE_SUCCESS) {
40,819✔
137
    SCatalog* pCatalog = NULL;
40,819✔
138
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
40,819✔
139
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
40,819✔
140
  }
141

142
  code = pRequest->code;
40,819✔
143
  uDebug(LOG_ID_TAG " create stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
40,819✔
144

145
end:
40,819✔
146
  destroyRequest(pRequest);
40,819✔
147
  tFreeSMCreateStbReq(&pReq);
40,819✔
148
  tDecoderClear(&coder);
40,819✔
149
  taosMemoryFree(pCmdMsg.pMsg);
40,819✔
150
  RAW_LOG_END
40,819✔
151
  return code;
40,819✔
152
}
153

154
static int32_t taosDropStb(TAOS* taos, void* meta, uint32_t metaLen) {
2,478✔
155
  if (taos == NULL || meta == NULL) {
2,478✔
UNCOV
156
    uError("invalid parameter in %s", __func__);
×
157
    return TSDB_CODE_INVALID_PARA;
×
158
  }
159
  SVDropStbReq req = {0};
2,478✔
160
  SDecoder     coder = {0};
2,478✔
161
  SMDropStbReq pReq = {0};
2,478✔
162
  int32_t      code = TSDB_CODE_SUCCESS;
2,478✔
163
  int32_t      lino = 0;
2,478✔
164
  SRequestObj* pRequest = NULL;
2,478✔
165
  SCmdMsgInfo  pCmdMsg = {0};
2,478✔
166

167
  RAW_LOG_START
2,478✔
168
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
2,478✔
169
  uDebug(LOG_ID_TAG " drop stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen);
2,478✔
170
  pRequest->syncQuery = true;
2,478✔
171
  if (!pRequest->pDb) {
2,478✔
UNCOV
172
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
173
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
174
    goto end;
×
175
  }
176
  // decode and process req
177
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
2,478✔
178
  uint32_t len = metaLen - sizeof(SMsgHead);
2,478✔
179
  tDecoderInit(&coder, data, len);
2,478✔
180
  RAW_RETURN_CHECK(tDecodeSVDropStbReq(&coder, &req));
2,478✔
181
  SCatalog* pCatalog = NULL;
2,478✔
182
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
2,478✔
183
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
2,478✔
184
                           .requestId = pRequest->requestId,
2,478✔
185
                           .requestObjRefId = pRequest->self,
2,478✔
186
                           .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
2,478✔
187
  SName            pName = {0};
2,478✔
188
  toName(pRequest->pTscObj->acctId, pRequest->pDb, req.name, &pName);
2,478✔
189
  STableMeta* pTableMeta = NULL;
2,478✔
190
  code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
2,478✔
191
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
2,478✔
192
    uInfo(LOG_ID_TAG " stable %s not exist, ignore drop", LOG_ID_VALUE, req.name);
612✔
193
    code = TSDB_CODE_SUCCESS;
612✔
194
    taosMemoryFreeClear(pTableMeta);
612✔
195
    goto end;
612✔
196
  }
197
  RAW_RETURN_CHECK(code);
1,866✔
198
  pReq.suid = pTableMeta->uid;
1,866✔
199
  taosMemoryFreeClear(pTableMeta);
1,866✔
200

201
  // build drop stable
202
  pReq.igNotExists = true;
1,866✔
203
  pReq.source = TD_REQ_FROM_TAOX;
1,866✔
204
  //  pReq.suid = processSuid(req.suid, pRequest->pDb);
205

206
  uDebug(LOG_ID_TAG " drop stable name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, req.name, req.suid,
1,866✔
207
         pReq.suid);
208
  STscObj* pTscObj = pRequest->pTscObj;
1,866✔
209
  SName    tableName = {0};
1,866✔
210
  toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName);
1,866✔
211
  RAW_RETURN_CHECK(tNameExtractFullName(&tableName, pReq.name));
1,866✔
212

213
  pCmdMsg.epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
1,866✔
214
  pCmdMsg.msgType = TDMT_MND_DROP_STB;
1,866✔
215
  pCmdMsg.msgLen = tSerializeSMDropStbReq(NULL, 0, &pReq);
1,866✔
216
  RAW_FALSE_CHECK(pCmdMsg.msgLen > 0);
1,866✔
217
  pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
1,866✔
218
  RAW_NULL_CHECK(pCmdMsg.pMsg);
1,866✔
219
  RAW_FALSE_CHECK(tSerializeSMDropStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq) > 0);
1,866✔
220

221
  SQuery pQuery = {0};
1,866✔
222
  pQuery.execMode = QUERY_EXEC_MODE_RPC;
1,866✔
223
  pQuery.pCmdMsg = &pCmdMsg;
1,866✔
224
  pQuery.msgType = pQuery.pCmdMsg->msgType;
1,866✔
225
  pQuery.stableQuery = true;
1,866✔
226

227
  launchQueryImpl(pRequest, &pQuery, true, NULL);  // ignore, because return value is pRequest
1,866✔
228
  if (pRequest->code == TSDB_CODE_SUCCESS) {
1,866✔
229
    // ignore the error code
230
    RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
1,866✔
231
    RAW_RETURN_CHECK(catalogRemoveTableMeta(pCatalog, &tableName));
1,866✔
232
  }
233

234
  code = pRequest->code;
1,866✔
235
  uDebug(LOG_ID_TAG " drop stable return, msg:%s", LOG_ID_VALUE, tstrerror(code));
1,866✔
236

237
end:
2,478✔
238
  RAW_LOG_END
2,478✔
239
  destroyRequest(pRequest);
2,478✔
240
  tDecoderClear(&coder);
2,478✔
241
  return code;
2,478✔
242
}
243

244
typedef struct SVgroupCreateTableBatch {
245
  SVCreateTbBatchReq req;
246
  SVgroupInfo        info;
247
  char               dbName[TSDB_DB_NAME_LEN];
248
} SVgroupCreateTableBatch;
249

250
static void destroyCreateTbReqBatch(void* data) {
55,896✔
251
  if (data == NULL) {
55,896✔
UNCOV
252
    uError("invalid parameter in %s", __func__);
×
253
    return;
×
254
  }
255
  SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data;
55,896✔
256
  taosArrayDestroy(pTbBatch->req.pArray);
55,896✔
257
}
258

259
static const SSchema* getNormalColSchema(const STableMeta* pTableMeta, const char* pColName) {
6,876✔
260
  for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) {
19,869✔
261
    const SSchema* pSchema = pTableMeta->schema + i;
19,869✔
262
    if (0 == strcmp(pColName, pSchema->name)) {
19,869✔
263
      return pSchema;
6,876✔
264
    }
265
  }
UNCOV
266
  return NULL;
×
267
}
268

269
static STableMeta* getTableMeta(SCatalog* pCatalog, SRequestConnInfo* conn, char* dbName, char* tbName, int32_t acctId){
6,876✔
270
  SName       sName = {0};
6,876✔
271
  toName(acctId, dbName, tbName, &sName);
6,876✔
272
  STableMeta* pTableMeta = NULL;
6,876✔
273
  int32_t code = catalogGetTableMeta(pCatalog, conn, &sName, &pTableMeta);
6,876✔
274
  if (code != 0) {
6,876✔
UNCOV
275
    uError("failed to get table meta for reference table:%s.%s", dbName, tbName);
×
276
    taosMemoryFreeClear(pTableMeta);
×
277
    terrno = code;
×
278
    return NULL;
×
279
  }
280
  return pTableMeta;
6,876✔
281
}
282

283
static int32_t checkColRef(STableMeta* pTableMeta, char* colName, uint8_t precision, const char* pSchemaName,
6,492✔
284
                           const SDataType* pType) {
285
  int32_t code = TSDB_CODE_SUCCESS;
6,492✔
286
  if (pTableMeta->tableInfo.precision != precision) {
6,492✔
UNCOV
287
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN_TYPE;
×
288
    uError("timestamp precision of virtual table and its reference table do not match");
×
289
    goto end;
×
290
  }
291
  // org table cannot has composite primary key
292
  if (pTableMeta->tableInfo.numOfColumns > 1 && pTableMeta->schema[1].flags & COL_IS_KEY) {
6,492✔
UNCOV
293
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
294
    uError("virtual table's column:\"%s\"'s reference can not from table with composite key", colName);
×
295
    goto end;
×
296
  }
297

298
  // org table must be child table or normal table
299
  if (pTableMeta->tableType != TSDB_NORMAL_TABLE && pTableMeta->tableType != TSDB_CHILD_TABLE) {
6,492✔
UNCOV
300
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
301
    uError("virtual table's column:\"%s\"'s reference can only be normal table or child table", colName);
×
302
    goto end;
×
303
  }
304

305
  const SSchema* pRefCol = getNormalColSchema(pTableMeta, colName);
6,492✔
306
  if (NULL == pRefCol) {
6,492✔
UNCOV
307
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
308
    uError("virtual table's column:\"%s\"'s reference column:\"%s\" not exist", pSchemaName, colName);
×
309
    goto end;
×
310
  }
311

312
  int32_t refColIndex = getNormalColSchemaIndex(pTableMeta, colName);
6,492✔
313
  const SSchemaExt* pRefSchemaExt =
6,492✔
314
      (refColIndex >= 0 && pTableMeta->schemaExt && refColIndex < pTableMeta->tableInfo.numOfColumns)
6,492✔
315
          ? pTableMeta->schemaExt + refColIndex
6,492✔
316
          : NULL;
12,984✔
317
  SDataType refType = {0};
6,492✔
318
  schemaToRefDataType(pRefCol, NULL != pRefSchemaExt ? pRefSchemaExt->typeMod : 0, &refType);
6,492✔
319

320
  if (!isSameRefDataType(pType, &refType)) {
6,492✔
UNCOV
321
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN_TYPE;
×
322
    uError("virtual table's column:\"%s\"'s type and reference column:\"%s\"'s type not match, %d %d %d %d",
×
323
            pSchemaName, colName, pType->type, pType->bytes, refType.type, refType.bytes);
UNCOV
324
    goto end;
×
325
  }
326

327
end:
6,492✔
328
  return code;
6,492✔
329
}
330

331
static int32_t checkColRefForCreate(SCatalog* pCatalog, SRequestConnInfo* conn, SColRef* pColRef, int32_t acctId,
6,108✔
332
                                    uint8_t precision, SSchema* pSchema, const SSchemaExt* pSchemaExt) {
333
  STableMeta* pTableMeta = getTableMeta(pCatalog, conn, pColRef->refDbName, pColRef->refTableName, acctId);
6,108✔
334
  if (pTableMeta == NULL) {
6,108✔
UNCOV
335
      return terrno;
×
336
  }
337
  SDataType colType = {0};
6,108✔
338
  schemaToRefDataType(pSchema, NULL != pSchemaExt ? pSchemaExt->typeMod : 0, &colType);
6,108✔
339
  int32_t code = checkColRef(pTableMeta, pColRef->refColName, precision, pSchema->name, &colType);
6,108✔
340
  taosMemoryFreeClear(pTableMeta);
6,108✔
341
  return code;
6,108✔
342
}
343

UNCOV
344
static int32_t checkColRefForAdd(SCatalog* pCatalog, SRequestConnInfo* conn, int32_t acctId, char* dbName, char* tbName, char* colName, 
×
345
  char* dbNameSrc, char* tbNameSrc, char* colNameSrc, int8_t type, int32_t bytes, STypeMod typeMod) {
UNCOV
346
  int32_t code = 0;
×
347
  STableMeta* pTableMeta = getTableMeta(pCatalog, conn, dbName, tbName, acctId);
×
348
  if (pTableMeta == NULL) {
×
349
    code = terrno;
×
350
    goto end;
×
351
  }
UNCOV
352
  STableMeta* pTableMetaSrc = getTableMeta(pCatalog, conn, dbNameSrc, tbNameSrc, acctId);
×
353
  if (pTableMetaSrc == NULL) {
×
354
    code = terrno;
×
355
    goto end;
×
356
  }
357

UNCOV
358
  SDataType colType = {.type = type, .bytes = bytes};
×
359
  fillTypeFromTypeMod(&colType, typeMod);
×
360
  code = checkColRef(pTableMeta, colName, pTableMetaSrc->tableInfo.precision, colNameSrc, &colType);
×
361

UNCOV
362
end:
×
363
  taosMemoryFreeClear(pTableMeta);
×
364
  taosMemoryFreeClear(pTableMetaSrc);
×
365
  return code;
×
366
}
367

368
static int32_t checkColRefForAlter(SCatalog* pCatalog, SRequestConnInfo* conn, int32_t acctId, char* dbName, char* tbName, char* colName, 
384✔
369
  char* dbNameSrc, char* tbNameSrc, char* colNameSrc) {
370
  int32_t code = 0;
384✔
371
  STableMeta* pTableMeta = getTableMeta(pCatalog, conn, dbName, tbName, acctId);
384✔
372
  if (pTableMeta == NULL) {
384✔
UNCOV
373
    code = terrno;
×
374
    goto end;
×
375
  }
376
  STableMeta* pTableMetaSrc = getTableMeta(pCatalog, conn, dbNameSrc, tbNameSrc, acctId);
384✔
377
  if (pTableMetaSrc == NULL) {
384✔
UNCOV
378
    code = terrno;
×
379
    goto end;
×
380
  }
381
  const SSchema* pSchema = getNormalColSchema(pTableMetaSrc, colNameSrc);
384✔
382
  if (NULL == pSchema) {
384✔
UNCOV
383
    code = TSDB_CODE_PAR_INVALID_REF_COLUMN;
×
384
    uError("virtual table's column:\"%s\" not exist", colNameSrc);
×
385
    goto end;
×
386
  }
387

388
  int32_t schemaIdx = getNormalColSchemaIndex(pTableMetaSrc, colNameSrc);
384✔
389
  const SSchemaExt* pSchemaExt =
384✔
390
      (schemaIdx >= 0 && pTableMetaSrc->schemaExt && schemaIdx < pTableMetaSrc->tableInfo.numOfColumns)
384✔
391
          ? pTableMetaSrc->schemaExt + schemaIdx
384✔
392
          : NULL;
768✔
393
  SDataType colType = {0};
384✔
394
  schemaToRefDataType(pSchema, NULL != pSchemaExt ? pSchemaExt->typeMod : 0, &colType);
384✔
395
  code = checkColRef(pTableMeta, colName, pTableMetaSrc->tableInfo.precision, pSchema->name, &colType);
384✔
396

397
end:
384✔
398
  taosMemoryFreeClear(pTableMeta);
384✔
399
  taosMemoryFreeClear(pTableMetaSrc);
384✔
400
  return code;
384✔
401
}
402

403
static int32_t taosCreateTable(TAOS* taos, void* meta, uint32_t metaLen) {
55,277✔
404
  if (taos == NULL || meta == NULL) {
55,277✔
UNCOV
405
    uError("invalid parameter in %s", __func__);
×
406
    return TSDB_CODE_INVALID_PARA;
×
407
  }
408
  SVCreateTbBatchReq req = {0};
55,277✔
409
  SDecoder           coder = {0};
55,277✔
410
  int32_t            code = TSDB_CODE_SUCCESS;
55,277✔
411
  int32_t            lino = 0;
55,277✔
412
  SRequestObj*       pRequest = NULL;
55,277✔
413
  SQuery*            pQuery = NULL;
55,277✔
414
  SHashObj*          pVgroupHashmap = NULL;
55,277✔
415

416
  RAW_LOG_START
55,277✔
417
  SArray* pTagList = taosArrayInit(0, POINTER_BYTES);
55,277✔
418
  RAW_NULL_CHECK(pTagList);
55,277✔
419
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
55,277✔
420
  uDebug(LOG_ID_TAG " create table, meta:%p, metaLen:%d, db:%s", LOG_ID_VALUE, meta, metaLen,
55,277✔
421
         pRequest->pDb ? pRequest->pDb : "NULL");
422

423
  pRequest->syncQuery = true;
55,277✔
424
  if (!pRequest->pDb) {
55,277✔
UNCOV
425
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
426
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
427
    goto end;
×
428
  }
429
  // decode and process req
430
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
55,277✔
431
  uint32_t len = metaLen - sizeof(SMsgHead);
55,277✔
432
  tDecoderInit(&coder, data, len);
55,277✔
433
  RAW_RETURN_CHECK(tDecodeSVCreateTbBatchReq(&coder, &req));
55,277✔
434
  STscObj* pTscObj = pRequest->pTscObj;
55,277✔
435

436
  SVCreateTbReq* pCreateReq = NULL;
55,277✔
437
  SCatalog*      pCatalog = NULL;
55,277✔
438
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
55,277✔
439
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
55,277✔
440
  RAW_NULL_CHECK(pVgroupHashmap);
55,277✔
441
  taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
55,277✔
442

443
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
55,277✔
444
                           .requestId = pRequest->requestId,
55,277✔
445
                           .requestObjRefId = pRequest->self,
55,277✔
446
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
55,277✔
447

448
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
55,277✔
449
  RAW_NULL_CHECK(pRequest->tableList);
55,277✔
450
  // loop to create table
451
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
114,056✔
452
    pCreateReq = req.pReqs + iReq;
58,779✔
453

454
    SVgroupInfo pInfo = {0};
58,779✔
455
    SName       pName = {0};
58,779✔
456
    toName(pTscObj->acctId, pRequest->pDb, pCreateReq->name, &pName);
58,779✔
457
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo));
58,779✔
458

459
    pCreateReq->flags |= TD_CREATE_IF_NOT_EXISTS;
58,779✔
460
    // change tag cid to new cid
461
    if (pCreateReq->type == TSDB_CHILD_TABLE || pCreateReq->type == TSDB_VIRTUAL_CHILD_TABLE) {
58,779✔
462
      STableMeta* pTableMeta = NULL;
49,386✔
463
      SName       sName = {0};
49,386✔
464
      tb_uid_t    oldSuid = pCreateReq->ctb.suid;
49,386✔
465
      //      pCreateReq->ctb.suid = processSuid(pCreateReq->ctb.suid, pRequest->pDb);
466
      toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName);
49,386✔
467
      code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta);
49,386✔
468
      if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
49,386✔
UNCOV
469
        uInfo(LOG_ID_TAG " super table %s not exist, ignore create child table %s", LOG_ID_VALUE,
×
470
              pCreateReq->ctb.stbName, pCreateReq->name);
UNCOV
471
        code = TSDB_CODE_SUCCESS;
×
472
        taosMemoryFreeClear(pTableMeta);
×
473
        continue;
×
474
      }
475

476
      RAW_RETURN_CHECK(code);
49,386✔
477
      pCreateReq->ctb.suid = pTableMeta->uid;
49,386✔
478

479
      bool changeDB = strlen(tmqWriteRefDB) > 0;
49,386✔
480
      for (int32_t i = 0; changeDB && i < pCreateReq->colRef.nCols; i++) {
64,674✔
481
        SColRef* pColRef = pCreateReq->colRef.pColRef + i;
15,288✔
482
        tstrncpy(pColRef->refDbName, tmqWriteRefDB, TSDB_DB_NAME_LEN);
15,288✔
483
      }
484

485
      for (int32_t i = 0; tmqWriteCheckRef && i < pCreateReq->colRef.nCols && i < pTableMeta->tableInfo.numOfColumns; i++) {
61,602✔
486
        SColRef* pColRef = pCreateReq->colRef.pColRef + i;
12,216✔
487
        if (!pColRef || !pColRef->hasRef) continue;
12,216✔
488
        SSchema* pSchema = pTableMeta->schema + i;
6,108✔
489
        const SSchemaExt* pSchemaExt =
6,108✔
490
            (pTableMeta->schemaExt && i < pTableMeta->tableInfo.numOfColumns) ? pTableMeta->schemaExt + i : NULL;
6,108✔
491
        RAW_RETURN_CHECK(
6,108✔
492
            checkColRefForCreate(pCatalog, &conn, pColRef, pTscObj->acctId, pTableMeta->tableInfo.precision, pSchema,
493
                                 pSchemaExt));
494
      }
495
      
496
      SArray* pTagVals = NULL;
49,386✔
497
      code = tTagToValArray((STag*)pCreateReq->ctb.pTag, &pTagVals);
49,386✔
498
      if (code != TSDB_CODE_SUCCESS) {
49,386✔
UNCOV
499
        uError("create tb invalid tag data %s", pCreateReq->name);
×
500
        taosMemoryFreeClear(pTableMeta);
×
501
        goto end;
×
502
      }
503

504
      bool rebuildTag = false;
49,386✔
505
      for (int32_t i = 0; i < taosArrayGetSize(pCreateReq->ctb.tagName); i++) {
146,564✔
506
        char* tName = taosArrayGet(pCreateReq->ctb.tagName, i);
97,178✔
507
        if (tName == NULL) {
97,178✔
UNCOV
508
          continue;
×
509
        }
510
        for (int32_t j = pTableMeta->tableInfo.numOfColumns;
97,178✔
511
             j < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; j++) {
368,266✔
512
          SSchema* tag = &pTableMeta->schema[j];
271,088✔
513
          if (strcmp(tag->name, tName) == 0 && tag->type != TSDB_DATA_TYPE_JSON) {
271,088✔
514
            STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
92,837✔
515
            if (pTagVal) {
92,837✔
516
              if (pTagVal->cid != tag->colId) {
92,837✔
517
                pTagVal->cid = tag->colId;
6,479✔
518
                rebuildTag = true;
6,479✔
519
              }
520
            } else {
UNCOV
521
              uError("create tb invalid data %s, size:%d index:%d cid:%d", pCreateReq->name,
×
522
                     (int)taosArrayGetSize(pTagVals), i, tag->colId);
523
            }
524
          }
525
        }
526
      }
527
      taosMemoryFreeClear(pTableMeta);
49,386✔
528
      if (rebuildTag) {
49,386✔
529
        STag* ppTag = NULL;
4,031✔
530
        code = tTagNew(pTagVals, 1, false, &ppTag);
4,031✔
531
        taosArrayDestroy(pTagVals);
4,031✔
532
        pTagVals = NULL;
4,031✔
533
        if (code != TSDB_CODE_SUCCESS) {
4,031✔
UNCOV
534
          goto end;
×
535
        }
536
        if (NULL == taosArrayPush(pTagList, &ppTag)) {
4,031✔
UNCOV
537
          code = terrno;
×
538
          tTagFree(ppTag);
×
539
          goto end;
×
540
        }
541
        pCreateReq->ctb.pTag = (uint8_t*)ppTag;
4,031✔
542
      }
543
      taosArrayDestroy(pTagVals);
49,386✔
544
    }
545
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
117,558✔
546

547
    SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
58,779✔
548
    if (pTableBatch == NULL) {
58,779✔
549
      SVgroupCreateTableBatch tBatch = {0};
55,896✔
550
      tBatch.info = pInfo;
55,896✔
551
      tstrncpy(tBatch.dbName, pRequest->pDb, TSDB_DB_NAME_LEN);
55,896✔
552

553
      tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
55,896✔
554
      RAW_NULL_CHECK(tBatch.req.pArray);
55,896✔
555
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pCreateReq));
111,792✔
556
      tBatch.req.source = TD_REQ_FROM_TAOX;
55,896✔
557
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
55,896✔
558
    } else {  // add to the correct vgroup
559
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pCreateReq));
5,766✔
560
    }
561
  }
562

563
  if (taosHashGetSize(pVgroupHashmap) == 0) {
55,277✔
UNCOV
564
    goto end;
×
565
  }
566
  SArray* pBufArray = NULL;
55,277✔
567
  RAW_RETURN_CHECK(serializeVgroupsCreateTableBatch(pVgroupHashmap, &pBufArray));
55,277✔
568
  pQuery = NULL;
55,277✔
569
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
55,277✔
570
  if (TSDB_CODE_SUCCESS != code) goto end;
55,277✔
571
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
55,277✔
572
  pQuery->msgType = TDMT_VND_CREATE_TABLE;
55,277✔
573
  pQuery->stableQuery = false;
55,277✔
574
  code = nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT, &pQuery->pRoot);
55,277✔
575
  if (TSDB_CODE_SUCCESS != code) goto end;
55,277✔
576
  RAW_NULL_CHECK(pQuery->pRoot);
55,277✔
577

578
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
55,277✔
579

580
  launchQueryImpl(pRequest, pQuery, true, NULL);
55,277✔
581
  if (pRequest->code == TSDB_CODE_SUCCESS) {
55,277✔
582
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
55,277✔
583
  }
584

585
  code = pRequest->code;
55,277✔
586
  uDebug(LOG_ID_TAG " create table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
55,277✔
587

588
end:
55,277✔
589
  tDeleteSVCreateTbBatchReq(&req);
55,277✔
590

591
  taosHashCleanup(pVgroupHashmap);
55,277✔
592
  destroyRequest(pRequest);
55,277✔
593
  tDecoderClear(&coder);
55,277✔
594
  qDestroyQuery(pQuery);
55,277✔
595
  taosArrayDestroyP(pTagList, NULL);
55,277✔
596
  RAW_LOG_END
55,277✔
597
  return code;
55,277✔
598
}
599

600
typedef struct SVgroupDropTableBatch {
601
  SVDropTbBatchReq req;
602
  SVgroupInfo      info;
603
  char             dbName[TSDB_DB_NAME_LEN];
604
} SVgroupDropTableBatch;
605

606
static void destroyDropTbReqBatch(void* data) {
1,598✔
607
  if (data == NULL) {
1,598✔
UNCOV
608
    uError("invalid parameter in %s", __func__);
×
609
    return;
×
610
  }
611
  SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
1,598✔
612
  taosArrayDestroy(pTbBatch->req.pArray);
1,598✔
613
}
614

615
static int32_t taosDropTable(TAOS* taos, void* meta, uint32_t metaLen) {
2,210✔
616
  if (taos == NULL || meta == NULL) {
2,210✔
UNCOV
617
    uError("invalid parameter in %s", __func__);
×
618
    return TSDB_CODE_INVALID_PARA;
×
619
  }
620
  SVDropTbBatchReq req = {0};
2,210✔
621
  SDecoder         coder = {0};
2,210✔
622
  int32_t          code = TSDB_CODE_SUCCESS;
2,210✔
623
  int32_t          lino = 0;
2,210✔
624
  SRequestObj*     pRequest = NULL;
2,210✔
625
  SQuery*          pQuery = NULL;
2,210✔
626
  SHashObj*        pVgroupHashmap = NULL;
2,210✔
627

628
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
2,210✔
629
  uDebug(LOG_ID_TAG " drop table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
2,210✔
630

631
  pRequest->syncQuery = true;
2,210✔
632
  if (!pRequest->pDb) {
2,210✔
UNCOV
633
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
634
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
635
    goto end;
×
636
  }
637
  // decode and process req
638
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
2,210✔
639
  uint32_t len = metaLen - sizeof(SMsgHead);
2,210✔
640
  tDecoderInit(&coder, data, len);
2,210✔
641
  RAW_RETURN_CHECK(tDecodeSVDropTbBatchReq(&coder, &req));
2,210✔
642
  STscObj* pTscObj = pRequest->pTscObj;
2,210✔
643

644
  SVDropTbReq* pDropReq = NULL;
2,210✔
645
  SCatalog*    pCatalog = NULL;
2,210✔
646
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
2,210✔
647

648
  pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
2,210✔
649
  RAW_NULL_CHECK(pVgroupHashmap);
2,210✔
650
  taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
2,210✔
651

652
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
2,210✔
653
                           .requestId = pRequest->requestId,
2,210✔
654
                           .requestObjRefId = pRequest->self,
2,210✔
655
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
2,210✔
656
  pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
2,210✔
657
  RAW_NULL_CHECK(pRequest->tableList);
2,210✔
658
  // loop to create table
659
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
5,066✔
660
    pDropReq = req.pReqs + iReq;
2,856✔
661
    pDropReq->igNotExists = true;
2,856✔
662
    //    pDropReq->suid = processSuid(pDropReq->suid, pRequest->pDb);
663

664
    SVgroupInfo pInfo = {0};
2,856✔
665
    SName       pName = {0};
2,856✔
666
    toName(pTscObj->acctId, pRequest->pDb, pDropReq->name, &pName);
2,856✔
667
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
2,856✔
668
    PROCESS_TABLE_NOT_EXIST(code, pDropReq->name)
2,856✔
669
    STableMeta* pTableMeta = NULL;
2,856✔
670
    code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta);
2,856✔
671
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
2,856✔
672
      code = TSDB_CODE_SUCCESS;
612✔
673
      uInfo(LOG_ID_TAG " table %s not exist, ignore drop", LOG_ID_VALUE, pDropReq->name);
612✔
674
      taosMemoryFreeClear(pTableMeta);
612✔
675
      continue;
612✔
676
    }
677
    RAW_RETURN_CHECK(code);
2,244✔
678
    tb_uid_t oldSuid = pDropReq->suid;
2,244✔
679
    pDropReq->suid = pTableMeta->suid;
2,244✔
680
    taosMemoryFreeClear(pTableMeta);
2,244✔
681
    uDebug(LOG_ID_TAG " drop table name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, pDropReq->name, oldSuid,
2,244✔
682
           pDropReq->suid);
683

684
    RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName));
4,488✔
685
    SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
2,244✔
686
    if (pTableBatch == NULL) {
2,244✔
687
      SVgroupDropTableBatch tBatch = {0};
1,598✔
688
      tBatch.info = pInfo;
1,598✔
689
      tBatch.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
1,598✔
690
      RAW_NULL_CHECK(tBatch.req.pArray);
1,598✔
691
      RAW_NULL_CHECK(taosArrayPush(tBatch.req.pArray, pDropReq));
3,196✔
692
      RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId), &tBatch, sizeof(tBatch)));
1,598✔
693
    } else {  // add to the correct vgroup
694
      RAW_NULL_CHECK(taosArrayPush(pTableBatch->req.pArray, pDropReq));
1,292✔
695
    }
696
  }
697

698
  if (taosHashGetSize(pVgroupHashmap) == 0) {
2,210✔
699
    goto end;
612✔
700
  }
701
  SArray* pBufArray = NULL;
1,598✔
702
  RAW_RETURN_CHECK(serializeVgroupsDropTableBatch(pVgroupHashmap, &pBufArray));
1,598✔
703
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
1,598✔
704
  if (TSDB_CODE_SUCCESS != code) goto end;
1,598✔
705
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
1,598✔
706
  pQuery->msgType = TDMT_VND_DROP_TABLE;
1,598✔
707
  pQuery->stableQuery = false;
1,598✔
708
  pQuery->pRoot = NULL;
1,598✔
709
  code = nodesMakeNode(QUERY_NODE_DROP_TABLE_STMT, &pQuery->pRoot);
1,598✔
710
  if (TSDB_CODE_SUCCESS != code) goto end;
1,598✔
711
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pBufArray));
1,598✔
712

713
  launchQueryImpl(pRequest, pQuery, true, NULL);
1,598✔
714
  if (pRequest->code == TSDB_CODE_SUCCESS) {
1,598✔
715
    RAW_RETURN_CHECK(removeMeta(pTscObj, pRequest->tableList, false));
1,598✔
716
  }
717
  code = pRequest->code;
1,598✔
718
  uDebug(LOG_ID_TAG " drop table return, msg:%s", LOG_ID_VALUE, tstrerror(code));
1,598✔
719

720
end:
2,210✔
721
  taosHashCleanup(pVgroupHashmap);
2,210✔
722
  destroyRequest(pRequest);
2,210✔
723
  tDecoderClear(&coder);
2,210✔
724
  qDestroyQuery(pQuery);
2,210✔
725
  RAW_LOG_END
2,210✔
726
  return code;
2,210✔
727
}
728

729
static int32_t taosDeleteData(TAOS* taos, void* meta, uint32_t metaLen) {
938✔
730
  if (taos == NULL || meta == NULL) {
938✔
UNCOV
731
    uError("invalid parameter in %s", __func__);
×
732
    return TSDB_CODE_INVALID_PARA;
×
733
  }
734
  SDeleteRes req = {0};
938✔
735
  SDecoder   coder = {0};
938✔
736
  char       sql[256] = {0};
938✔
737
  int32_t    code = TSDB_CODE_SUCCESS;
938✔
738
  int32_t    lino = 0;
938✔
739
  uDebug("connId:0x%" PRIx64 " delete data, meta:%p, len:%d", *(int64_t*)taos, meta, metaLen);
938✔
740

741
  // decode and process req
742
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
938✔
743
  uint32_t len = metaLen - sizeof(SMsgHead);
938✔
744
  tDecoderInit(&coder, data, len);
938✔
745
  RAW_RETURN_CHECK(tDecodeDeleteRes(&coder, &req));
938✔
746
  (void)snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName,
938✔
747
                 req.tsColName, req.skey, req.tsColName, req.ekey);
748

749
  TAOS_RES* res = taosQueryImpl(taos, sql, false, TD_REQ_FROM_TAOX);
938✔
750
  RAW_NULL_CHECK(res);
938✔
751
  SRequestObj* pRequest = (SRequestObj*)res;
938✔
752
  code = pRequest->code;
938✔
753
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_PAR_GET_META_ERROR) {
938✔
754
    code = TSDB_CODE_SUCCESS;
306✔
755
  }
756
  taos_free_result(res);
938✔
757
  uDebug("connId:0x%" PRIx64 " delete data sql:%s, code:%s", *(int64_t*)taos, sql, tstrerror(code));
938✔
758

759
end:
938✔
760
  RAW_LOG_END
938✔
761
  tDecoderClear(&coder);
938✔
762
  return code;
938✔
763
}
764

765
static int32_t taosAlterTable(TAOS* taos, void* meta, uint32_t metaLen) {
15,687✔
766
  if (taos == NULL || meta == NULL) {
15,687✔
UNCOV
767
    uError("invalid parameter in %s", __func__);
×
768
    return TSDB_CODE_INVALID_PARA;
×
769
  }
770
  SVAlterTbReq   req = {0};
15,687✔
771
  SDecoder       dcoder = {0};
15,687✔
772
  int32_t        code = TSDB_CODE_SUCCESS;
15,687✔
773
  int32_t        lino = 0;
15,687✔
774
  SRequestObj*   pRequest = NULL;
15,687✔
775
  SQuery*        pQuery = NULL;
15,687✔
776
  SArray*        pArray = NULL;
15,687✔
777
  SVgDataBlocks* pVgData = NULL;
15,687✔
778
  SArray*        pVgList = NULL;
15,687✔
779
  SEncoder       coder = {0};
15,687✔
780
  SHashObj*      pVgroupHashmap = NULL;
15,687✔
781

782
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0));
15,687✔
783
  uDebug(LOG_ID_TAG " alter table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen);
15,687✔
784
  pRequest->syncQuery = true;
15,687✔
785
  if (!pRequest->pDb) {
15,687✔
UNCOV
786
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
787
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
788
    goto end;
×
789
  }
790
  // decode and process req
791
  void*    data = POINTER_SHIFT(meta, sizeof(SMsgHead));
15,687✔
792
  uint32_t len = metaLen - sizeof(SMsgHead);
15,687✔
793
  tDecoderInit(&dcoder, data, len);
15,687✔
794
  RAW_RETURN_CHECK(tDecodeSVAlterTbReq(&dcoder, &req));
15,687✔
795
  // do not deal TSDB_ALTER_TABLE_UPDATE_OPTIONS
796
  if (req.action == TSDB_ALTER_TABLE_UPDATE_OPTIONS) {
15,687✔
797
    uInfo(LOG_ID_TAG " alter table action is UPDATE_OPTIONS, ignore", LOG_ID_VALUE);
1,259✔
798
    goto end;
1,259✔
799
  }
800

801
  STscObj*  pTscObj = pRequest->pTscObj;
14,428✔
802
  SCatalog* pCatalog = NULL;
14,428✔
803
  RAW_RETURN_CHECK(catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog));
14,428✔
804
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
14,428✔
805
                           .requestId = pRequest->requestId,
14,428✔
806
                           .requestObjRefId = pRequest->self,
14,428✔
807
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
14,428✔
808

809
  // Handle Type 1 batch modification with vnode grouping
810
  if (req.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TABLE_TAG_VAL) {
14,428✔
811
    if (req.tables == NULL || taosArrayGetSize(req.tables) == 0) {
4,581✔
UNCOV
812
      uError(LOG_ID_TAG " Type 1 batch alter has empty tables array", LOG_ID_VALUE);
×
813
      code = TSDB_CODE_INVALID_PARA;
×
814
      goto end;
×
815
    }
816

817
    int32_t nTables = taosArrayGetSize(req.tables);
4,581✔
818
    uDebug(LOG_ID_TAG " Type 1 batch alter with %d tables, grouping by vnode", LOG_ID_VALUE, nTables);
4,581✔
819

820
    // Create hashmap to group tables by vgId
821
    pVgroupHashmap = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
4,581✔
822
    RAW_NULL_CHECK(pVgroupHashmap);
4,581✔
823

824
    // Group tables by vnode
825
    for (int32_t i = 0; i < nTables; i++) {
9,926✔
826
      SUpdateTableTagVal* pTable = taosArrayGet(req.tables, i);
5,345✔
827
      if (pTable == NULL || pTable->tbName == NULL) {
5,345✔
UNCOV
828
        uWarn(LOG_ID_TAG " Type 1 batch alter table[%d] has invalid name, skip", LOG_ID_VALUE, i);
×
829
        continue;
×
830
      }
831

832
      // Query vnode for this table
833
      SVgroupInfo vgInfo = {0};
5,345✔
834
      SName pName = {0};
5,345✔
835
      toName(pTscObj->acctId, pRequest->pDb, pTable->tbName, &pName);
5,345✔
836
      code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgInfo);
5,345✔
837
      PROCESS_TABLE_NOT_EXIST(code, pTable->tbName)
5,345✔
838

839
      // Add table to corresponding vnode's array
840
      SArray** ppTables = taosHashGet(pVgroupHashmap, &vgInfo.vgId, sizeof(int32_t));
5,345✔
841
      if (ppTables == NULL) {
5,345✔
842
        SArray* pTables = taosArrayInit(16, sizeof(SUpdateTableTagVal));
4,963✔
843
        RAW_NULL_CHECK(pTables);
4,963✔
844
        RAW_RETURN_CHECK(taosHashPut(pVgroupHashmap, &vgInfo.vgId, sizeof(int32_t), &pTables, sizeof(void*)));
4,963✔
845
        ppTables = taosHashGet(pVgroupHashmap, &vgInfo.vgId, sizeof(int32_t));
4,963✔
846
      }
847

848
      SArray* pTables = *ppTables;
5,345✔
849
      RAW_NULL_CHECK(taosArrayPush(pTables, pTable));
5,345✔
850
    }
851

852
    // Build and send separate request for each vnode
853
    pArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*));
4,581✔
854
    RAW_NULL_CHECK(pArray);
4,581✔
855

856
    void* pIter = taosHashIterate(pVgroupHashmap, NULL);
4,581✔
857
    while (pIter) {
9,544✔
858
      size_t keyLen = 0;
4,963✔
859
      int32_t* pVgId = taosHashGetKey(pIter, &keyLen);
4,963✔
860
      SArray* pTables = *(SArray**)pIter;
4,963✔
861
      int32_t nTablesInVg = taosArrayGetSize(pTables);
4,963✔
862

863
      uDebug(LOG_ID_TAG " Type 1 batch alter: vgId:%d has %d tables", LOG_ID_VALUE, *pVgId, nTablesInVg);
4,963✔
864

865
      // Build SVAlterTbReq for this vnode
866
      SVAlterTbReq vgReq = {0};
4,963✔
867
      vgReq.action = req.action;
4,963✔
868
      vgReq.tbName = req.tbName;
4,963✔
869
      vgReq.source = TD_REQ_FROM_TAOX;
4,963✔
870
      vgReq.tables = pTables;
4,963✔
871

872
      // Encode request
873
      int tlen = 0;
4,963✔
874
      tEncodeSize(tEncodeSVAlterTbReq, &vgReq, tlen, code);
4,963✔
875
      if (code < 0) {
4,963✔
UNCOV
876
        uError(LOG_ID_TAG " Type 1 batch alter encode failed for vgId:%d, code:%s",
×
877
               LOG_ID_VALUE, *pVgId, tstrerror(code));
UNCOV
878
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
879
        goto end;
×
880
      }
881

882
      tlen += sizeof(SMsgHead);
4,963✔
883
      void* pMsg = taosMemoryMalloc(tlen);
4,963✔
884
      if (pMsg == NULL) {
4,963✔
UNCOV
885
        code = terrno;
×
886
        uError(LOG_ID_TAG " Type 1 batch alter malloc failed for vgId:%d, size:%d",
×
887
               LOG_ID_VALUE, *pVgId, tlen);
UNCOV
888
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
889
        goto end;
×
890
      }
891

892
      ((SMsgHead*)pMsg)->vgId = htonl(*pVgId);
4,963✔
893
      ((SMsgHead*)pMsg)->contLen = htonl(tlen);
4,963✔
894
      void* pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
4,963✔
895

896
      SEncoder vgCoder = {0};
4,963✔
897
      tEncoderInit(&vgCoder, pBuf, tlen - sizeof(SMsgHead));
4,963✔
898
      code = tEncodeSVAlterTbReq(&vgCoder, &vgReq);
4,963✔
899
      tEncoderClear(&vgCoder);
4,963✔
900

901
      if (code < 0) {
4,963✔
UNCOV
902
        uError(LOG_ID_TAG " Type 1 batch alter encode2 failed for vgId:%d, code:%s",
×
903
               LOG_ID_VALUE, *pVgId, tstrerror(code));
UNCOV
904
        taosMemoryFree(pMsg);
×
905
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
906
        goto end;
×
907
      }
908

909
      // Query vgroup info for first table to get endpoint
910
      SUpdateTableTagVal* pFirstTable = taosArrayGet(pTables, 0);
4,963✔
911
      SVgroupInfo vgInfo = {0};
4,963✔
912
      SName pName = {0};
4,963✔
913
      toName(pTscObj->acctId, pRequest->pDb, pFirstTable->tbName, &pName);
4,963✔
914
      code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgInfo);
4,963✔
915
      if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
4,963✔
UNCOV
916
        taosMemoryFree(pMsg);
×
917
        pIter = taosHashIterate(pVgroupHashmap, pIter);
×
918
        code = TSDB_CODE_SUCCESS;
×
919
        continue;
×
920
      }
921
      if (code != TSDB_CODE_SUCCESS) {
4,963✔
UNCOV
922
        taosMemoryFree(pMsg);
×
923
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
924
        goto end;
×
925
      }
926

927
      // Create VgDataBlocks for this vnode
928
      pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
4,963✔
929
      if (pVgData == NULL) {
4,963✔
UNCOV
930
        code = terrno;
×
931
        taosMemoryFree(pMsg);
×
932
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
933
        goto end;
×
934
      }
935
      pVgData->vg = vgInfo;
4,963✔
936
      pVgData->pData = pMsg;
4,963✔
937
      pVgData->size = tlen;
4,963✔
938
      pVgData->numOfTables = nTablesInVg;
4,963✔
939

940
      if (taosArrayPush(pArray, &pVgData) == NULL) {
4,963✔
UNCOV
941
        code = terrno;
×
942
        taosMemoryFree(pMsg);
×
943
        taosHashCancelIterate(pVgroupHashmap, pIter);
×
944
        goto end;
×
945
      }
946

947
      pVgData = NULL;  // Ownership transferred to pArray
4,963✔
948
      pIter = taosHashIterate(pVgroupHashmap, pIter);
4,963✔
949
    }
950

951
    uInfo(LOG_ID_TAG " Type 1 batch alter: grouped %d tables into %d vnodes",
4,581✔
952
          LOG_ID_VALUE, nTables, (int32_t)taosArrayGetSize(pArray));
953
  } else if (req.action == TSDB_ALTER_TABLE_UPDATE_CHILD_TABLE_TAG_VAL) {
9,847✔
954
    char dbFName[TSDB_DB_FNAME_LEN] = {0};
1,739✔
955
    SName pName = {TSDB_TABLE_NAME_T, pTscObj->acctId, {0}, {0}};
1,739✔
956
    tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
1,739✔
957
    (void)tNameGetFullDbName(&pName, dbFName);
1,739✔
958
    RAW_RETURN_CHECK(catalogGetDBVgList(pCatalog, &conn, dbFName, &pVgList));
1,739✔
959

960
    pArray = taosArrayInit(taosArrayGetSize(pVgList), sizeof(void*));
1,739✔
961
    RAW_NULL_CHECK(pArray);
1,739✔
962
    for (int i = 0; i < taosArrayGetSize(pVgList); ++i) {
4,242✔
963
      SVgroupInfo* pInfo = (SVgroupInfo*)taosArrayGet(pVgList, i);
2,503✔
964
      pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
2,503✔
965
      RAW_NULL_CHECK(pVgData);
2,503✔
966
      pVgData->vg = *pInfo;
2,503✔
967

968
      int tlen = 0;
2,503✔
969
      req.source = TD_REQ_FROM_TAOX;
2,503✔
970

971
      tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code);
2,503✔
972
      RAW_RETURN_CHECK(code);
2,503✔
973
      tlen += sizeof(SMsgHead);
2,503✔
974
      void* pMsg = taosMemoryMalloc(tlen);
2,503✔
975
      RAW_NULL_CHECK(pMsg);
2,503✔
976
      ((SMsgHead*)pMsg)->vgId = htonl(pInfo->vgId);
2,503✔
977
      ((SMsgHead*)pMsg)->contLen = htonl(tlen);
2,503✔
978
      void* pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
2,503✔
979
      tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
2,503✔
980
      RAW_RETURN_CHECK(tEncodeSVAlterTbReq(&coder, &req));
2,503✔
981
      tEncoderClear(&coder);
2,503✔
982

983
      pVgData->pData = pMsg;
2,503✔
984
      pVgData->size = tlen;
2,503✔
985

986
      pVgData->numOfTables = 1;
2,503✔
987
      RAW_NULL_CHECK(taosArrayPush(pArray, &pVgData));
2,503✔
988
      pVgData = NULL;  // Ownership transferred to pArray
2,503✔
989
    }
990
  } else {
991
    // Single table or Type 2 modification - original logic
992
    SVgroupInfo pInfo = {0};
8,108✔
993
    SName       pName = {0};
8,108✔
994
    toName(pTscObj->acctId, pRequest->pDb, req.tbName, &pName);
8,108✔
995
    code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &pInfo);
8,108✔
996
    if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
8,108✔
UNCOV
997
      code = TSDB_CODE_SUCCESS;
×
998
      goto end;
×
999
    }
1000
    RAW_RETURN_CHECK(code);
8,108✔
1001
    pArray = taosArrayInit(1, sizeof(void*));
8,108✔
1002
    RAW_NULL_CHECK(pArray);
8,108✔
1003

1004
    pVgData = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
8,108✔
1005
    RAW_NULL_CHECK(pVgData);
8,108✔
1006
    pVgData->vg = pInfo;
8,108✔
1007

1008
    int tlen = 0;
8,108✔
1009
    req.source = TD_REQ_FROM_TAOX;
8,108✔
1010

1011
    if (strlen(tmqWriteRefDB) > 0) {
8,108✔
1012
      req.refDbName = tmqWriteRefDB;
3,072✔
1013
    }
1014

1015
    if (req.action == TSDB_ALTER_TABLE_ALTER_COLUMN_REF && tmqWriteCheckRef) {
8,108✔
1016
      RAW_RETURN_CHECK(checkColRefForAlter(pCatalog, &conn, pTscObj->acctId, req.refDbName, req.refTbName, req.refColName,
384✔
1017
        pRequest->pDb, req.tbName, req.colName));
1018
    }else if (req.action == TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COLUMN_REF && tmqWriteCheckRef) {
7,724✔
UNCOV
1019
      RAW_RETURN_CHECK(checkColRefForAdd(pCatalog, &conn, pTscObj->acctId, req.refDbName, req.refTbName, req.refColName,
×
1020
        pRequest->pDb, req.tbName, req.colName, req.type, req.bytes, req.typeMod));
1021
    }
1022

1023
    tEncodeSize(tEncodeSVAlterTbReq, &req, tlen, code);
8,108✔
1024
    RAW_RETURN_CHECK(code);
8,108✔
1025
    tlen += sizeof(SMsgHead);
8,108✔
1026
    void* pMsg = taosMemoryMalloc(tlen);
8,108✔
1027
    RAW_NULL_CHECK(pMsg);
8,108✔
1028
    ((SMsgHead*)pMsg)->vgId = htonl(pInfo.vgId);
8,108✔
1029
    ((SMsgHead*)pMsg)->contLen = htonl(tlen);
8,108✔
1030
    void* pBuf = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
8,108✔
1031
    tEncoderInit(&coder, pBuf, tlen - sizeof(SMsgHead));
8,108✔
1032
    RAW_RETURN_CHECK(tEncodeSVAlterTbReq(&coder, &req));
8,108✔
1033

1034
    pVgData->pData = pMsg;
8,108✔
1035
    pVgData->size = tlen;
8,108✔
1036

1037
    pVgData->numOfTables = 1;
8,108✔
1038
    RAW_NULL_CHECK(taosArrayPush(pArray, &pVgData));
8,108✔
1039
    pVgData = NULL;
8,108✔
1040
  }
1041

1042
  pQuery = NULL;
14,428✔
1043
  RAW_RETURN_CHECK(nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery));
14,428✔
1044
  if (NULL == pQuery) goto end;
14,428✔
1045
  pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
14,428✔
1046
  pQuery->msgType = TDMT_VND_ALTER_TABLE;
14,428✔
1047
  pQuery->stableQuery = false;
14,428✔
1048
  pQuery->pRoot = NULL;
14,428✔
1049
  RAW_RETURN_CHECK(nodesMakeNode(QUERY_NODE_ALTER_TABLE_STMT, &pQuery->pRoot));
14,428✔
1050
  RAW_RETURN_CHECK(rewriteToVnodeModifyOpStmt(pQuery, pArray));
14,428✔
1051

1052
  launchQueryImpl(pRequest, pQuery, true, NULL);
14,428✔
1053
  pArray = NULL;
14,428✔
1054

1055
  code = pRequest->code;
14,428✔
1056
  if (code == TSDB_CODE_TDB_TABLE_NOT_EXIST || code == TSDB_CODE_NOT_FOUND) {
14,428✔
1057
    code = TSDB_CODE_SUCCESS;
1,013✔
1058
  }
1059

1060
  if (pRequest->code == TSDB_CODE_SUCCESS) {
14,428✔
1061
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
13,415✔
1062
    if (pRes->res != NULL) {
13,415✔
1063
      code = handleAlterTbExecRes(pRes->res, pCatalog);
8,108✔
1064
    }
1065
  }
1066
  uDebug(LOG_ID_TAG " alter table return, meta:%p, len:%d, msg:%s", LOG_ID_VALUE, meta, metaLen, tstrerror(code));
14,428✔
1067

1068
end:
15,687✔
1069
  // Cleanup vnode grouping hashmap
1070
  if (pVgroupHashmap != NULL) {
15,687✔
1071
    void* pIter = taosHashIterate(pVgroupHashmap, NULL);
4,581✔
1072
    while (pIter) {
9,544✔
1073
      SArray* pTables = *(SArray**)pIter;
4,963✔
1074
      taosArrayDestroy(pTables);
4,963✔
1075
      pIter = taosHashIterate(pVgroupHashmap, pIter);
4,963✔
1076
    }
1077
    taosHashCleanup(pVgroupHashmap);
4,581✔
1078
  }
1079

1080
  for (int i = 0; i < taosArrayGetSize(pArray); ++i) {
15,687✔
UNCOV
1081
    SVgDataBlocks* pData = (SVgDataBlocks*)taosArrayGetP(pArray, i);
×
1082
    if (pData && pData->pData) {
×
1083
      taosMemoryFreeClear(pData->pData);
×
1084
      taosMemoryFreeClear(pData);
×
1085
    }
1086
  }
1087
  taosArrayDestroy(pArray);
15,687✔
1088
  
1089
  if (pVgData) {
15,687✔
UNCOV
1090
    taosMemoryFreeClear(pVgData->pData);
×
1091
    taosMemoryFreeClear(pVgData);
×
1092
  }
1093
  taosArrayDestroy(pVgList);
15,687✔
1094
  destroyRequest(pRequest);
15,687✔
1095
  tDecoderClear(&dcoder);
15,687✔
1096
  qDestroyQuery(pQuery);
15,687✔
1097
  destroyAlterTbReq(&req);
15,687✔
1098
  tEncoderClear(&coder);
15,687✔
1099
  RAW_LOG_END
15,687✔
1100
  return code;
15,687✔
1101
}
1102

1103
int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD* fields,
311✔
1104
                                     int numFields) {
1105
  return taos_write_raw_block_with_fields_with_reqid(taos, rows, pData, tbname, fields, numFields, 0);
311✔
1106
}
1107

1108
int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname,
2,488✔
1109
                                                TAOS_FIELD* fields, int numFields, int64_t reqid) {
1110
  if (taos == NULL || pData == NULL || tbname == NULL) {
2,488✔
UNCOV
1111
    uError("invalid parameter in %s, taos:%p, pData:%p, tbname:%p", __func__, taos, pData, tbname);
×
1112
    return TSDB_CODE_INVALID_PARA;
×
1113
  }
1114
  int32_t     code = TSDB_CODE_SUCCESS;
2,488✔
1115
  int32_t     lino = 0;
2,488✔
1116
  STableMeta* pTableMeta = NULL;
2,488✔
1117
  SQuery*     pQuery = NULL;
2,488✔
1118
  SHashObj*   pVgHash = NULL;
2,488✔
1119

1120
  SRequestObj* pRequest = NULL;
2,488✔
1121
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, reqid));
2,488✔
1122

1123
  uDebug(LOG_ID_TAG " write raw block, rows:%d, pData:%p, tbname:%s, fields:%p, numFields:%d", LOG_ID_VALUE,
2,488✔
1124
         rows, pData, tbname, fields, numFields);
1125

1126
  pRequest->syncQuery = true;
2,488✔
1127
  if (!pRequest->pDb) {
2,488✔
UNCOV
1128
    uError(LOG_ID_TAG " %s no database selected", LOG_ID_VALUE, __func__);
×
1129
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1130
    goto end;
×
1131
  }
1132

1133
  SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
2,488✔
1134
  tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
2,488✔
1135
  tstrncpy(pName.tname, tbname, sizeof(pName.tname));
2,488✔
1136

1137
  struct SCatalog* pCatalog = NULL;
2,488✔
1138
  RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog));
2,488✔
1139

1140
  SRequestConnInfo conn = {0};
2,488✔
1141
  conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter;
2,488✔
1142
  conn.requestId = pRequest->requestId;
2,488✔
1143
  conn.requestObjRefId = pRequest->self;
2,488✔
1144
  conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
2,488✔
1145

1146
  SVgroupInfo vgData = {0};
2,488✔
1147
  RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData));
2,488✔
1148
  RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta));
2,488✔
1149
  uDebug(LOG_ID_TAG " write raw block got meta, tbname:%s, numOfColumns:%d", LOG_ID_VALUE,
2,177✔
1150
         tbname, pTableMeta->tableInfo.numOfColumns);
1151
  RAW_RETURN_CHECK(smlInitHandle(&pQuery));
2,177✔
1152
  pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
2,177✔
1153
  RAW_NULL_CHECK(pVgHash);
2,177✔
1154
  RAW_RETURN_CHECK(
2,177✔
1155
      taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)));
1156
  RAW_RETURN_CHECK(rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false, NULL, 0, false));
2,177✔
1157
  RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
1,555✔
1158

1159
  launchQueryImpl(pRequest, pQuery, true, NULL);
1,555✔
1160
  code = pRequest->code;
1,555✔
1161
  uDebug(LOG_ID_TAG " write raw block return, tbname:%s, msg:%s", LOG_ID_VALUE, tbname, tstrerror(code));
1,555✔
1162

1163
end:
2,488✔
1164
  taosMemoryFreeClear(pTableMeta);
2,488✔
1165
  qDestroyQuery(pQuery);
2,488✔
1166
  destroyRequest(pRequest);
2,488✔
1167
  taosHashCleanup(pVgHash);
2,488✔
1168
  RAW_LOG_END
2,488✔
1169
  return code;
2,488✔
1170
}
1171

1172
int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) {
2,177✔
1173
  return taos_write_raw_block_with_fields_with_reqid(taos, rows, pData, tbname, NULL, 0, 0);
2,177✔
1174
}
1175

UNCOV
1176
int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname, int64_t reqid) {
×
1177
  return taos_write_raw_block_with_fields_with_reqid(taos, rows, pData, tbname, NULL, 0, reqid);
×
1178
}
1179

1180
static void* getRawDataFromRes(void* pRetrieve) {
53,924✔
1181
  if (pRetrieve == NULL) {
53,924✔
UNCOV
1182
    uError("invalid parameter in %s", __func__);
×
1183
    return NULL;
×
1184
  }
1185
  void* rawData = NULL;
53,924✔
1186
  // deal with compatibility
1187
  if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_VERSION) {
53,924✔
UNCOV
1188
    rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
×
1189
  } else if (*(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_VERSION ||
53,924✔
UNCOV
1190
             *(int64_t*)pRetrieve == RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION) {
×
1191
    rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
53,924✔
1192
  }
1193
  return rawData;
53,924✔
1194
}
1195

1196
static int32_t buildCreateTbMap(SMqDataRsp* rsp, SHashObj* pHashObj) {
3,210✔
1197
  if (rsp == NULL || pHashObj == NULL) {
3,210✔
UNCOV
1198
    uError("invalid parameter in %s", __func__);
×
1199
    return TSDB_CODE_INVALID_PARA;
×
1200
  }
1201
  // find schema data info
1202
  int32_t       code = 0;
3,210✔
1203
  int32_t       lino = 0;
3,210✔
1204
  SVCreateTbReq pCreateReq = {0};
3,210✔
1205
  SDecoder      decoderTmp = {0};
3,210✔
1206
  RAW_LOG_START
3,210✔
1207
  for (int j = 0; j < rsp->createTableNum; j++) {
8,380✔
1208
    void** dataTmp = taosArrayGet(rsp->createTableReq, j);
5,170✔
1209
    RAW_NULL_CHECK(dataTmp);
5,170✔
1210
    int32_t* lenTmp = taosArrayGet(rsp->createTableLen, j);
5,170✔
1211
    RAW_NULL_CHECK(lenTmp);
5,170✔
1212

1213
    tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
5,170✔
1214
    RAW_RETURN_CHECK(tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq));
5,170✔
1215

1216
    if (pCreateReq.type != TSDB_CHILD_TABLE) {
5,170✔
UNCOV
1217
      uError("invalid table type %d in %s", pCreateReq.type, __func__);
×
1218
      code = TSDB_CODE_INVALID_MSG;
×
1219
      goto end;
×
1220
    }
1221
    if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL) {
5,170✔
1222
      RAW_RETURN_CHECK(
5,170✔
1223
          taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq)));
1224
    } else {
UNCOV
1225
      tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
×
1226
    }
1227

1228
    tDecoderClear(&decoderTmp);
5,170✔
1229
    pCreateReq = (SVCreateTbReq){0};
5,170✔
1230
  }
1231

1232
end:
3,210✔
1233
  tDecoderClear(&decoderTmp);
3,210✔
1234
  tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
3,210✔
1235
  RAW_LOG_END
3,210✔
1236
  return code;
3,210✔
1237
}
1238

1239
typedef enum {
1240
  WRITE_RAW_INIT_START = 0,
1241
  WRITE_RAW_INIT_OK,
1242
  WRITE_RAW_INIT_FAIL,
1243
} WRITE_RAW_INIT_STATUS;
1244

1245
static SHashObj* writeRawCache = NULL;
1246
static int8_t    initFlag = 0;
1247
static int8_t    initedFlag = WRITE_RAW_INIT_START;
1248

1249
typedef struct {
1250
  SHashObj* pVgHash;
1251
  SHashObj* pNameHash;
1252
  SHashObj* pMetaHash;
1253
} rawCacheInfo;
1254

1255
typedef struct {
1256
  SVgroupInfo vgInfo;
1257
  int64_t     uid;
1258
  int64_t     suid;
1259
} tbInfo;
1260

1261
static void tmqFreeMeta(void* data) {
14,773✔
1262
  if (data == NULL) {
14,773✔
UNCOV
1263
    uError("invalid parameter in %s", __func__);
×
1264
    return;
×
1265
  }
1266
  STableMeta* pTableMeta = *(STableMeta**)data;
14,773✔
1267
  taosMemoryFree(pTableMeta);
14,773✔
1268
}
1269

UNCOV
1270
static void freeRawCache(void* data) {
×
1271
  if (data == NULL) {
×
1272
    uError("invalid parameter in %s", __func__);
×
1273
    return;
×
1274
  }
UNCOV
1275
  rawCacheInfo* pRawCache = (rawCacheInfo*)data;
×
1276
  taosHashCleanup(pRawCache->pMetaHash);
×
1277
  taosHashCleanup(pRawCache->pNameHash);
×
1278
  taosHashCleanup(pRawCache->pVgHash);
×
1279
}
1280

1281
static int32_t initRawCacheHash() {
7,388✔
1282
  if (writeRawCache == NULL) {
7,388✔
1283
    writeRawCache = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
7,388✔
1284
    if (writeRawCache == NULL) {
7,388✔
UNCOV
1285
      return terrno;
×
1286
    }
1287
    taosHashSetFreeFp(writeRawCache, freeRawCache);
7,388✔
1288
  }
1289
  return 0;
7,388✔
1290
}
1291

1292
static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW) {
4,065✔
1293
  if (rawData == NULL || pSW == NULL) {
4,065✔
UNCOV
1294
    return false;
×
1295
  }
1296
  if (pTableMeta == NULL) {
4,065✔
UNCOV
1297
    uError("invalid parameter in %s", __func__);
×
1298
    return false;
×
1299
  }
1300
  char* p = (char*)rawData;
4,065✔
1301
  // Raw block header layout:
1302
  // | version(int32) | totalLen(int32) | totalRows(int32) | blankFill(int32) | totalCols(int32) | flagSeg(uint64) | columnSchema... | colLen... |
1303
  int32_t rawBlockHeaderSize = sizeof(int32_t) * 5 + sizeof(uint64_t);
4,065✔
1304
  p += rawBlockHeaderSize;
4,065✔
1305
  int8_t* fields = (int8_t*)p;
4,065✔
1306

1307
  if (pSW->nCols != pTableMeta->tableInfo.numOfColumns) {
4,065✔
1308
    return true;
1,557✔
1309
  }
1310

1311
  for (int i = 0; i < pSW->nCols; i++) {
13,172✔
1312
    int j = 0;
10,664✔
1313
    for (; j < pTableMeta->tableInfo.numOfColumns; j++) {
28,240✔
1314
      SSchema*    pColSchema = &pTableMeta->schema[j];
28,240✔
1315
      SSchemaExt* pColExtSchema = &pTableMeta->schemaExt[j];
28,240✔
1316
      char*       fieldName = pSW->pSchema[i].name;
28,240✔
1317

1318
      if (strcmp(pColSchema->name, fieldName) == 0) {
28,240✔
1319
        if (checkSchema(pColSchema, pColExtSchema, fields, NULL, 0) != 0) {
10,664✔
UNCOV
1320
          return true;
×
1321
        }
1322
        break;
10,664✔
1323
      }
1324
    }
1325
    fields += sizeof(int8_t) + sizeof(int32_t);
10,664✔
1326

1327
    if (j == pTableMeta->tableInfo.numOfColumns) return true;
10,664✔
1328
  }
1329
  return false;
2,508✔
1330
}
1331

1332
static int32_t getRawCache(SHashObj** pVgHash, SHashObj** pNameHash, SHashObj** pMetaHash, void* key) {
23,904✔
1333
  if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || key == NULL) {
23,904✔
UNCOV
1334
    uError("invalid parameter in %s", __func__);
×
1335
    return TSDB_CODE_INVALID_PARA;
×
1336
  }
1337
  int32_t code = 0;
23,904✔
1338
  int32_t lino = 0;
23,904✔
1339
  RAW_LOG_START
23,904✔
1340
  void* cacheInfo = taosHashGet(writeRawCache, &key, POINTER_BYTES);
23,904✔
1341
  if (cacheInfo == NULL) {
23,904✔
1342
    *pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
23,904✔
1343
    RAW_NULL_CHECK(*pVgHash);
23,904✔
1344
    *pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
23,904✔
1345
    RAW_NULL_CHECK(*pNameHash);
23,904✔
1346
    *pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
23,904✔
1347
    RAW_NULL_CHECK(*pMetaHash);
23,904✔
1348
    taosHashSetFreeFp(*pMetaHash, tmqFreeMeta);
23,904✔
1349
    rawCacheInfo info = {*pVgHash, *pNameHash, *pMetaHash};
23,904✔
1350
    RAW_RETURN_CHECK(taosHashPut(writeRawCache, &key, POINTER_BYTES, &info, sizeof(rawCacheInfo)));
23,904✔
1351
  } else {
UNCOV
1352
    rawCacheInfo* info = (rawCacheInfo*)cacheInfo;
×
1353
    *pVgHash = info->pVgHash;
×
1354
    *pNameHash = info->pNameHash;
×
1355
    *pMetaHash = info->pMetaHash;
×
1356
  }
1357

1358
end:
23,904✔
1359
  if (code != 0) {
23,904✔
UNCOV
1360
    taosHashCleanup(*pMetaHash);
×
1361
    taosHashCleanup(*pNameHash);
×
1362
    taosHashCleanup(*pVgHash);
×
1363
  }
1364
  RAW_LOG_END
23,904✔
1365
  return code;
23,904✔
1366
}
1367

1368
static int32_t buildRawRequest(TAOS* taos, SRequestObj** pRequest, SCatalog** pCatalog, SRequestConnInfo* conn) {
23,904✔
1369
  if (taos == NULL || pRequest == NULL || pCatalog == NULL || conn == NULL) {
23,904✔
UNCOV
1370
    uError("invalid parameter in %s", __func__);
×
1371
    return TSDB_CODE_INVALID_PARA;
×
1372
  }
1373
  int32_t code = 0;
23,904✔
1374
  int32_t lino = 0;
23,904✔
1375
  RAW_LOG_START
23,904✔
1376
  RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, pRequest, 0));
23,904✔
1377
  (*pRequest)->syncQuery = true;
23,904✔
1378
  if (!(*pRequest)->pDb) {
23,904✔
UNCOV
1379
    uError("%s no database selected", __func__);
×
1380
    code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
×
1381
    goto end;
×
1382
  }
1383

1384
  RAW_RETURN_CHECK(catalogGetHandle((*pRequest)->pTscObj->pAppInfo->clusterId, pCatalog));
23,904✔
1385
  conn->pTrans = (*pRequest)->pTscObj->pAppInfo->pTransporter;
23,904✔
1386
  conn->requestId = (*pRequest)->requestId;
23,904✔
1387
  conn->requestObjRefId = (*pRequest)->self;
23,904✔
1388
  conn->mgmtEps = getEpSet_s(&(*pRequest)->pTscObj->pAppInfo->mgmtEp);
23,904✔
1389

1390
end:
23,904✔
1391
  RAW_LOG_END
23,904✔
1392
  return code;
23,904✔
1393
}
1394

1395
typedef int32_t _raw_decode_func_(SDecoder* pDecoder, SMqDataRsp* pRsp);
1396
static int32_t  decodeRawData(SDecoder* decoder, void* data, uint32_t dataLen, _raw_decode_func_ func,
23,904✔
1397
                              SMqRspObj* rspObj) {
1398
  if (decoder == NULL || data == NULL || func == NULL || rspObj == NULL) {
23,904✔
UNCOV
1399
    uError("invalid parameter in %s", __func__);
×
1400
    return TSDB_CODE_INVALID_PARA;
×
1401
  }
1402
  int8_t dataVersion = *(int8_t*)data;
23,904✔
1403
  if (dataVersion >= MQ_DATA_RSP_VERSION) {
23,904✔
1404
    data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
23,904✔
1405
    if (dataLen < sizeof(int8_t) + sizeof(int32_t)) {
23,904✔
UNCOV
1406
      return TSDB_CODE_INVALID_PARA;
×
1407
    }
1408
    dataLen -= sizeof(int8_t) + sizeof(int32_t);
23,904✔
1409
  }
1410

1411
  rspObj->resIter = -1;
23,904✔
1412
  tDecoderInit(decoder, data, dataLen);
23,904✔
1413
  int32_t code = func(decoder, &rspObj->dataRsp);
23,904✔
1414
  if (code != 0) {
23,904✔
UNCOV
1415
    SET_ERROR_MSG("decode mq taosx data rsp failed");
×
1416
  }
1417
  return code;
23,904✔
1418
}
1419

1420
static int32_t processCacheMeta(SHashObj* pVgHash, SHashObj* pNameHash, SHashObj* pMetaHash,
53,924✔
1421
                                SVCreateTbReq* pCreateReqDst, SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName,
1422
                                STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry) {
1423
  if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || pCatalog == NULL || conn == NULL || pName == NULL ||
53,924✔
1424
      pMeta == NULL) {
UNCOV
1425
    uError("invalid parameter in %s", __func__);
×
1426
    return TSDB_CODE_INVALID_PARA;
×
1427
  }
1428
  int32_t code = 0;
53,924✔
1429
  int32_t lino = 0;
53,924✔
1430
  RAW_LOG_START
53,924✔
1431
  STableMeta* pTableMeta = NULL;
53,924✔
1432
  tbInfo*     tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
53,924✔
1433
  uDebug("%s tname:%s, cacheHit:%d, retry:%d, hasCreateReq:%d", __func__,
53,924✔
1434
         pName->tname, tmpInfo != NULL, retry, pCreateReqDst != NULL);
1435
  if (tmpInfo == NULL || retry > 0) {
53,924✔
1436
    tbInfo info = {0};
49,859✔
1437

1438
    RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, conn, pName, &info.vgInfo));
49,859✔
1439
    if (pCreateReqDst && tmpInfo == NULL) {  // change stable name to get meta
49,859✔
1440
      tstrncpy(pName->tname, pCreateReqDst->ctb.stbName, TSDB_TABLE_NAME_LEN);
5,170✔
1441
    }
1442
    RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
49,859✔
1443
    info.uid = pTableMeta->uid;
48,329✔
1444
    if (pTableMeta->tableType == TSDB_CHILD_TABLE) {
48,329✔
1445
      info.suid = pTableMeta->suid;
37,010✔
1446
    } else {
1447
      info.suid = pTableMeta->uid;
11,319✔
1448
    }
1449
    code = taosHashPut(pMetaHash, &info.suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
48,329✔
1450
    RAW_RETURN_CHECK(code);
48,329✔
1451

1452
    uDebug("put table meta to hash1, suid:%" PRId64 ", metaHashSIze:%d, nameHashSize:%d, vgHashSize:%d", info.suid,
48,329✔
1453
           taosHashGetSize(pMetaHash), taosHashGetSize(pNameHash), taosHashGetSize(pVgHash));
1454
    if (pCreateReqDst) {
48,329✔
1455
      pTableMeta->vgId = info.vgInfo.vgId;
5,170✔
1456
      pTableMeta->uid = pCreateReqDst->uid;
5,170✔
1457
      pCreateReqDst->ctb.suid = pTableMeta->suid;
5,170✔
1458
    }
1459

1460
    RAW_RETURN_CHECK(taosHashPut(pNameHash, pName->tname, strlen(pName->tname), &info, sizeof(tbInfo)));
48,329✔
1461
    tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
48,329✔
1462
    RAW_RETURN_CHECK(
48,329✔
1463
        taosHashPut(pVgHash, &info.vgInfo.vgId, sizeof(info.vgInfo.vgId), &info.vgInfo, sizeof(SVgroupInfo)));
1464
  }
1465

1466
  if (pTableMeta == NULL || retry > 0) {
52,394✔
1467
    STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, &tmpInfo->suid, LONG_BYTES);
4,065✔
1468
    if (pTableMetaTmp == NULL || retry > 0 || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) {
4,065✔
1469
      RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, conn, pName, &pTableMeta));
1,557✔
1470
      code = taosHashPut(pMetaHash, &tmpInfo->suid, LONG_BYTES, &pTableMeta, POINTER_BYTES);
1,557✔
1471
      RAW_RETURN_CHECK(code);
1,557✔
1472
      uDebug("put table meta to hash2, suid:%" PRId64 ", metaHashSIze:%d, nameHashSize:%d, vgHashSize:%d",
1,557✔
1473
             tmpInfo->suid, taosHashGetSize(pMetaHash), taosHashGetSize(pNameHash), taosHashGetSize(pVgHash));
1474
    } else {
1475
      pTableMeta = *pTableMetaTmp;
2,508✔
1476
      pTableMeta->uid = tmpInfo->uid;
2,508✔
1477
      pTableMeta->vgId = tmpInfo->vgInfo.vgId;
2,508✔
1478
    }
1479
  }
1480
  *pMeta = pTableMeta;
52,394✔
1481
  pTableMeta = NULL;
52,394✔
1482

1483
end:
53,924✔
1484
  taosMemoryFree(pTableMeta);
53,924✔
1485
  RAW_LOG_END
53,924✔
1486
  return code;
53,924✔
1487
}
1488

1489
static int32_t tmqWriteRawCommon(TAOS* taos, void* data, uint32_t dataLen,
23,904✔
1490
                                 _raw_decode_func_ decodeFunc, bool withMeta) {
1491
  if (taos == NULL || data == NULL) {
23,904✔
UNCOV
1492
    uError("invalid parameter in %s, taos:%p, data:%p, withMeta:%d", __func__, taos, data, withMeta);
×
1493
    return TSDB_CODE_INVALID_PARA;
×
1494
  }
1495
  int32_t   code = TSDB_CODE_SUCCESS;
23,904✔
1496
  int32_t   lino = 0;
23,904✔
1497
  SQuery*   pQuery = NULL;
23,904✔
1498
  SMqRspObj rspObj = {0};
23,904✔
1499
  SDecoder  decoder = {0};
23,904✔
1500
  SHashObj* pCreateTbHash = NULL;
23,904✔
1501

1502
  SRequestObj*     pRequest = NULL;
23,904✔
1503
  SCatalog*        pCatalog = NULL;
23,904✔
1504
  SRequestConnInfo conn = {0};
23,904✔
1505
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
23,904✔
1506
  uDebug(LOG_ID_TAG " write raw %s, data:%p, dataLen:%d", LOG_ID_VALUE, withMeta ? "metadata" : "data", data, dataLen);
23,904✔
1507
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, decodeFunc, &rspObj));
23,904✔
1508

1509
  if (withMeta) {
23,904✔
1510
    pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
3,210✔
1511
    RAW_NULL_CHECK(pCreateTbHash);
3,210✔
1512
    RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash));
3,210✔
1513
  }
1514

1515
  SHashObj* pVgHash = NULL;
23,904✔
1516
  SHashObj* pNameHash = NULL;
23,904✔
1517
  SHashObj* pMetaHash = NULL;
23,904✔
1518
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
23,904✔
1519
  int retry = 0;
23,904✔
1520
  while (1) {
1521
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
23,904✔
1522
    uDebug(LOG_ID_TAG " write raw %s block num:%d, retry:%d", LOG_ID_VALUE,
23,904✔
1523
           withMeta ? "metadata" : "data", rspObj.dataRsp.blockNum, retry);
1524
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
77,828✔
1525
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
53,924✔
1526
      RAW_NULL_CHECK(tbName);
53,924✔
1527
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
53,924✔
1528
      RAW_NULL_CHECK(pSW);
53,924✔
1529
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
53,924✔
1530
      RAW_NULL_CHECK(pRetrieve);
53,924✔
1531
      void* rawData = getRawDataFromRes(pRetrieve);
53,924✔
1532
      RAW_NULL_CHECK(rawData);
53,924✔
1533

1534
      uTrace(LOG_ID_TAG " write raw %s block[%d] tbname:%s, schemaCols:%d", LOG_ID_VALUE,
53,924✔
1535
             withMeta ? "metadata" : "data", rspObj.resIter, tbName, pSW->nCols);
1536
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
53,924✔
1537
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
53,924✔
1538
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
53,924✔
1539

1540
      SVCreateTbReq* pCreateReqDst = NULL;
53,924✔
1541
      if (pCreateTbHash) {
53,924✔
1542
        pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, pName.tname, strlen(pName.tname));
8,634✔
1543
      }
1544
      STableMeta* pTableMeta = NULL;
53,924✔
1545
      code = processCacheMeta(pVgHash, pNameHash, pMetaHash, pCreateReqDst, pCatalog, &conn, &pName,
53,924✔
1546
                              &pTableMeta, pSW, rawData, retry);
1547
      PROCESS_TABLE_NOT_EXIST(code, tbName)
53,924✔
1548
      char err[ERR_MSG_LEN] = {0};
52,394✔
1549
      code = rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true);
52,394✔
1550
      if (code != TSDB_CODE_SUCCESS) {
52,394✔
UNCOV
1551
        uError(LOG_ID_TAG " rawBlockBindData failed, table:%s, err:%s, code:%s", LOG_ID_VALUE, pName.tname, err, tstrerror(code));
×
1552
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
1553
        goto end;
×
1554
      }
1555
    }
1556
    if (taosHashGetSize(pVgHash) == 0) {
23,904✔
1557
      goto end;
1,224✔
1558
    }
1559
    RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash));
22,680✔
1560
    launchQueryImpl(pRequest, pQuery, true, NULL);
22,680✔
1561
    code = pRequest->code;
22,680✔
1562

1563
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
22,680✔
UNCOV
1564
      uInfo(LOG_ID_TAG " write raw retry:%d/3, code:%d, msg:%s", LOG_ID_VALUE, retry, code, tstrerror(code));
×
1565
      qDestroyQuery(pQuery);
×
1566
      pQuery = NULL;
×
1567
      rspObj.resIter = -1;
×
1568
      continue;
×
1569
    }
1570
    break;
22,680✔
1571
  }
1572
  uDebug(LOG_ID_TAG " write raw %s return, msg:%s", LOG_ID_VALUE, withMeta ? "metadata" : "data", tstrerror(code));
22,680✔
1573

1574
end:
23,904✔
1575
  if (withMeta) {
23,904✔
1576
    tDeleteSTaosxRsp(&rspObj.dataRsp);
3,210✔
1577
    void* pIter = taosHashIterate(pCreateTbHash, NULL);
3,210✔
1578
    while (pIter) {
8,380✔
1579
      tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE);
5,170✔
1580
      pIter = taosHashIterate(pCreateTbHash, pIter);
5,170✔
1581
    }
1582
    taosHashCleanup(pCreateTbHash);
3,210✔
1583
  } else {
1584
    tDeleteMqDataRsp(&rspObj.dataRsp);
20,694✔
1585
  }
1586
  tDecoderClear(&decoder);
23,904✔
1587
  qDestroyQuery(pQuery);
23,904✔
1588
  destroyRequest(pRequest);
23,904✔
1589
  RAW_LOG_END
23,904✔
1590
  return code;
23,904✔
1591
}
1592

1593
static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
20,694✔
1594
  return tmqWriteRawCommon(taos, data, dataLen, tDecodeMqDataRsp, false);
20,694✔
1595
}
1596

1597
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
3,210✔
1598
  return tmqWriteRawCommon(taos, data, dataLen, tDecodeSTaosxRsp, true);
3,210✔
1599
}
1600

UNCOV
1601
static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
×
1602
  if (taos == NULL || data == NULL) {
×
1603
    uError("invalid parameter in %s", __func__);
×
1604
    return TSDB_CODE_INVALID_PARA;
×
1605
  }
UNCOV
1606
  int32_t   code = TSDB_CODE_SUCCESS;
×
1607
  int32_t   lino = 0;
×
1608
  SQuery*   pQuery = NULL;
×
1609
  SHashObj* pVgroupHash = NULL;
×
1610
  SMqRspObj rspObj = {0};
×
1611
  SDecoder  decoder = {0};
×
1612

UNCOV
1613
  SRequestObj*     pRequest = NULL;
×
1614
  SCatalog*        pCatalog = NULL;
×
1615
  SRequestConnInfo conn = {0};
×
1616

UNCOV
1617
  RAW_RETURN_CHECK(buildRawRequest(taos, &pRequest, &pCatalog, &conn));
×
1618
  uDebug(LOG_ID_TAG " write raw rawdata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen);
×
1619
  RAW_RETURN_CHECK(decodeRawData(&decoder, data, dataLen, tDecodeMqDataRsp, &rspObj));
×
1620

UNCOV
1621
  SHashObj* pVgHash = NULL;
×
1622
  SHashObj* pNameHash = NULL;
×
1623
  SHashObj* pMetaHash = NULL;
×
1624
  RAW_RETURN_CHECK(getRawCache(&pVgHash, &pNameHash, &pMetaHash, taos));
×
1625
  int retry = 0;
×
1626
  while (1) {
×
1627
    RAW_RETURN_CHECK(smlInitHandle(&pQuery));
×
1628
    uDebug(LOG_ID_TAG " write raw rawdata block num:%d", LOG_ID_VALUE, rspObj.dataRsp.blockNum);
×
1629
    SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(pQuery)->pRoot;
×
1630
    pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
×
1631
    RAW_NULL_CHECK(pVgroupHash);
×
1632
    pStmt->pVgDataBlocks = taosArrayInit(8, POINTER_BYTES);
×
1633
    RAW_NULL_CHECK(pStmt->pVgDataBlocks);
×
1634

UNCOV
1635
    while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
×
1636
      const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
×
1637
      RAW_NULL_CHECK(tbName);
×
1638
      void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
×
1639
      RAW_NULL_CHECK(pRetrieve);
×
1640
      void* rawData = getRawDataFromRes(pRetrieve);
×
1641
      RAW_NULL_CHECK(rawData);
×
1642

UNCOV
1643
      uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
×
1644
      SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
×
1645
      tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
×
1646
      tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
×
1647

1648
      // find schema data info
UNCOV
1649
      STableMeta* pTableMeta = NULL;
×
1650
      code = processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, &pTableMeta, NULL,
×
1651
                                        NULL, retry);
UNCOV
1652
      PROCESS_TABLE_NOT_EXIST(code, tbName)
×
1653
      char err[ERR_MSG_LEN] = {0};
×
1654
      code = rawBlockBindRawData(pVgroupHash, pStmt->pVgDataBlocks, pTableMeta, rawData);
×
1655
      if (code != TSDB_CODE_SUCCESS) {
×
1656
        SET_ERROR_MSG("table:%s, err:%s", pName.tname, err);
×
1657
        goto end;
×
1658
      }
1659
    }
UNCOV
1660
    taosHashCleanup(pVgroupHash);
×
1661
    pVgroupHash = NULL;
×
1662

UNCOV
1663
    RAW_RETURN_CHECK(smlBuildOutputRaw(pQuery, pVgHash));
×
1664
    launchQueryImpl(pRequest, pQuery, true, NULL);
×
1665
    code = pRequest->code;
×
1666

UNCOV
1667
    if (NEED_CLIENT_HANDLE_ERROR(code) && retry++ < 3) {
×
1668
      uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code));
×
1669
      qDestroyQuery(pQuery);
×
1670
      pQuery = NULL;
×
1671
      rspObj.resIter = -1;
×
1672
      continue;
×
1673
    }
UNCOV
1674
    break;
×
1675
  }
UNCOV
1676
  uDebug(LOG_ID_TAG " write raw rawdata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
×
1677

UNCOV
1678
end:
×
1679
  tDeleteMqDataRsp(&rspObj.dataRsp);
×
1680
  tDecoderClear(&decoder);
×
1681
  qDestroyQuery(pQuery);
×
1682
  taosHashCleanup(pVgroupHash);
×
1683
  destroyRequest(pRequest);
×
1684
  RAW_LOG_END
×
1685
  return code;
×
1686
}
1687

1688
static int32_t getOffSetLen(const SMqDataRsp* pRsp) {
23,946✔
1689
  if (pRsp == NULL) {
23,946✔
UNCOV
1690
    uError("invalid parameter in %s", __func__);
×
1691
    return TSDB_CODE_INVALID_PARA;
×
1692
  }
1693
  int32_t pos = 0;
23,946✔
1694
  int32_t code = 0;
23,946✔
1695
  int32_t lino = 0;
23,946✔
1696
  RAW_LOG_START
23,946✔
1697
  SEncoder coder = {0};
23,946✔
1698
  tEncoderInit(&coder, NULL, 0);
23,946✔
1699
  RAW_RETURN_CHECK(tEncodeSTqOffsetVal(&coder, &pRsp->reqOffset));
23,946✔
1700
  RAW_RETURN_CHECK(tEncodeSTqOffsetVal(&coder, &pRsp->rspOffset));
23,946✔
1701
  pos = coder.pos;
23,946✔
1702
  tEncoderClear(&coder);
23,946✔
1703

1704
end:
23,946✔
1705
  if (code != 0) {
23,946✔
UNCOV
1706
    uError("getOffSetLen failed, code:%d", code);
×
1707
    return code;
×
1708
  } else {
1709
    uDebug("getOffSetLen success, len:%d", pos);
23,946✔
1710
    return pos;
23,946✔
1711
  }
1712
}
1713

1714
typedef int32_t __encode_func__(SEncoder* pEncoder, const SMqDataRsp* pRsp);
1715
static int32_t  encodeMqDataRsp(__encode_func__* encodeFunc, SMqDataRsp* rspObj, tmq_raw_data* raw) {
23,946✔
1716
  if (raw == NULL || encodeFunc == NULL || rspObj == NULL) {
23,946✔
UNCOV
1717
    uError("invalid parameter in %s", __func__);
×
1718
    return TSDB_CODE_INVALID_PARA;
×
1719
  }
1720
  uint32_t len = 0;
23,946✔
1721
  int32_t  code = 0;
23,946✔
1722
  int32_t  lino = 0;
23,946✔
1723
  SEncoder encoder = {0};
23,946✔
1724
  void*    buf = NULL;
23,946✔
1725
  tEncodeSize(encodeFunc, rspObj, len, code);
23,946✔
1726
  RAW_FALSE_CHECK(code >= 0);
23,946✔
1727
  len += sizeof(int8_t) + sizeof(int32_t);
23,946✔
1728
  buf = taosMemoryCalloc(1, len);
23,946✔
1729
  RAW_NULL_CHECK(buf);
23,946✔
1730
  tEncoderInit(&encoder, buf, len);
23,946✔
1731
  RAW_RETURN_CHECK(tEncodeI8(&encoder, MQ_DATA_RSP_VERSION));
23,946✔
1732
  int32_t offsetLen = getOffSetLen(rspObj);
23,946✔
1733
  RAW_FALSE_CHECK(offsetLen > 0);
23,946✔
1734
  RAW_RETURN_CHECK(tEncodeI32(&encoder, offsetLen));
23,946✔
1735
  RAW_RETURN_CHECK(encodeFunc(&encoder, rspObj));
23,946✔
1736

1737
  raw->raw = buf;
23,946✔
1738
  buf = NULL;
23,946✔
1739
  raw->raw_len = len;
23,946✔
1740

1741
end:
23,946✔
1742
  RAW_LOG_END
23,946✔
1743
  return code;
23,946✔
1744
}
1745

1746
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
107,293✔
1747
  if (raw == NULL || res == NULL) {
107,293✔
UNCOV
1748
    uError("invalid parameter in %s", __func__);
×
1749
    return TSDB_CODE_INVALID_PARA;
×
1750
  }
1751
  int32_t code = TSDB_CODE_SUCCESS;
107,293✔
1752
  int32_t lino = 0;
107,293✔
1753
  RAW_LOG_START
107,293✔
1754
  *raw = (tmq_raw_data){0};
107,293✔
1755
  SMqRspObj* rspObj = ((SMqRspObj*)res);
107,293✔
1756
  uDebug("tmq_get_raw resType:%d", rspObj->resType);
107,293✔
1757
  if (TD_RES_TMQ_META(res)) {
107,293✔
1758
    raw->raw = rspObj->metaRsp.metaRsp;
77,895✔
1759
    raw->raw_len = rspObj->metaRsp.metaRspLen >= 0 ? rspObj->metaRsp.metaRspLen : 0;
77,895✔
1760
    raw->raw_type = rspObj->metaRsp.resMsgType;
77,895✔
1761
    uDebug("tmq get raw type meta:%p", raw);
77,895✔
1762
  } else if (TD_RES_TMQ(res)) {
29,398✔
1763
    RAW_RETURN_CHECK(encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->dataRsp, raw));
20,736✔
1764
    raw->raw_type = RES_TYPE__TMQ;
20,736✔
1765
    uDebug("tmq get raw type data:%p", raw);
20,736✔
1766
  } else if (TD_RES_TMQ_METADATA(res)) {
8,662✔
1767
    RAW_RETURN_CHECK(encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->dataRsp, raw));
3,210✔
1768
    raw->raw_type = RES_TYPE__TMQ_METADATA;
3,210✔
1769
    uDebug("tmq get raw type metadata:%p", raw);
3,210✔
1770
  } else if (TD_RES_TMQ_BATCH_META(res)) {
5,452✔
1771
    raw->raw = rspObj->batchMetaRsp.pMetaBuff;
5,452✔
1772
    raw->raw_len = rspObj->batchMetaRsp.metaBuffLen;
5,452✔
1773
    raw->raw_type = rspObj->resType;
5,452✔
1774
    uDebug("tmq get raw batch meta:%p", raw);
5,452✔
UNCOV
1775
  } else if (TD_RES_TMQ_RAW(res)) {
×
1776
    raw->raw = rspObj->dataRsp.rawData;
×
1777
    rspObj->dataRsp.rawData = NULL;
×
1778
    raw->raw_len = rspObj->dataRsp.len;
×
1779
    raw->raw_type = rspObj->resType;
×
1780
    uDebug("tmq get raw raw:%p", raw);
×
1781
  } else {
UNCOV
1782
    uError("tmq get raw error type:%d", *(int8_t*)res);
×
1783
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
1784
  }
1785

1786
end:
107,293✔
1787
  RAW_LOG_END
107,293✔
1788
  return code;
107,293✔
1789
}
1790

1791
void tmq_free_raw(tmq_raw_data raw) {
107,251✔
1792
  uDebug("tmq free raw data type:%d", raw.raw_type);
107,251✔
1793
  if (raw.raw_type == RES_TYPE__TMQ || raw.raw_type == RES_TYPE__TMQ_METADATA) {
107,251✔
1794
    taosMemoryFree(raw.raw);
23,904✔
1795
  } else if (raw.raw_type == RES_TYPE__TMQ_RAWDATA && raw.raw != NULL) {
83,347✔
UNCOV
1796
    taosMemoryFree(POINTER_SHIFT(raw.raw, -sizeof(SMqRspHead)));
×
1797
  }
1798
  (void)memset(terrMsg, 0, ERR_MSG_LEN);
107,251✔
1799
}
107,251✔
1800

1801
static int32_t writeRawInit() {
146,765✔
1802
  while (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_START) {
154,153✔
1803
    int8_t old = atomic_val_compare_exchange_8(&initFlag, 0, 1);
7,388✔
1804
    if (old == 0) {
7,388✔
1805
      int32_t code = initRawCacheHash();
7,388✔
1806
      if (code != 0) {
7,388✔
UNCOV
1807
        uError("tmq writeRawImpl init error:%d", code);
×
1808
        atomic_store_8(&initedFlag, WRITE_RAW_INIT_FAIL);
×
1809
        return code;
×
1810
      }
1811
      atomic_store_8(&initedFlag, WRITE_RAW_INIT_OK);
7,388✔
1812
    }
1813
  }
1814

1815
  if (atomic_load_8(&initedFlag) == WRITE_RAW_INIT_FAIL) {
146,765✔
UNCOV
1816
    return TSDB_CODE_INTERNAL_ERROR;
×
1817
  }
1818
  return 0;
146,765✔
1819
}
1820

1821
static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) {
146,765✔
1822
  if (taos == NULL || buf == NULL) {
146,765✔
UNCOV
1823
    uError("invalid parameter in %s", __func__);
×
1824
    return TSDB_CODE_INVALID_PARA;
×
1825
  }
1826
  if (writeRawInit() != 0) {
146,765✔
UNCOV
1827
    return TSDB_CODE_INTERNAL_ERROR;
×
1828
  }
1829

1830
  uDebug("writeRawImpl type:%d, buf:%p, len:%d", type, buf, len);
146,765✔
1831
  switch (type) {
146,765✔
1832
    case TDMT_VND_CREATE_STB:
40,819✔
1833
    case TDMT_VND_ALTER_STB:
1834
      return taosCreateStb(taos, buf, len);
40,819✔
1835
    case TDMT_VND_DROP_STB:
2,478✔
1836
      return taosDropStb(taos, buf, len);
2,478✔
1837
    case TDMT_VND_CREATE_TABLE:
55,277✔
1838
      return taosCreateTable(taos, buf, len);
55,277✔
1839
    case TDMT_VND_ALTER_TABLE:
15,687✔
1840
      return taosAlterTable(taos, buf, len);
15,687✔
1841
    case TDMT_VND_DROP_TABLE:
2,210✔
1842
      return taosDropTable(taos, buf, len);
2,210✔
1843
    case TDMT_VND_DELETE:
938✔
1844
      return taosDeleteData(taos, buf, len);
938✔
1845
    case RES_TYPE__TMQ_METADATA:
3,210✔
1846
      return tmqWriteRawMetaDataImpl(taos, buf, len);
3,210✔
UNCOV
1847
    case RES_TYPE__TMQ_RAWDATA:
×
1848
      return tmqWriteRawRawDataImpl(taos, buf, len);
×
1849
    case RES_TYPE__TMQ:
20,694✔
1850
      return tmqWriteRawDataImpl(taos, buf, len);
20,694✔
1851
    case RES_TYPE__TMQ_BATCH_META:
5,452✔
1852
      return tmqWriteBatchMetaDataImpl(taos, buf, len);
5,452✔
UNCOV
1853
    default:
×
1854
      uError("writeRawImpl unknown type:%d", type);
×
1855
      return TSDB_CODE_INVALID_PARA;
×
1856
  }
1857
}
1858

1859
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
102,623✔
1860
  if (taos == NULL || raw.raw == NULL || raw.raw_len <= 0) {
102,623✔
1861
    SET_ERROR_MSG("taos:%p or data:%p is NULL or raw_len <= 0", taos, raw.raw);
3,082✔
1862
    return TSDB_CODE_INVALID_PARA;
3,082✔
1863
  }
1864
  taosClearErrMsg();  // clear global error message
99,541✔
1865
  uDebug("tmq_write_raw connId:0x%" PRIx64 ", raw_type:%d, raw_len:%d", *(int64_t*)taos, raw.raw_type, raw.raw_len);
99,541✔
1866

1867
  int32_t code = writeRawImpl(taos, raw.raw, raw.raw_len, raw.raw_type);
99,541✔
1868
  if (code != TSDB_CODE_SUCCESS) {
99,541✔
UNCOV
1869
    uError("tmq_write_raw connId:0x%" PRIx64 " failed, raw_type:%d, code:%s", *(int64_t*)taos, raw.raw_type, tstrerror(code));
×
1870
  }
1871
  return code;
99,541✔
1872
}
1873

1874
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen) {
5,452✔
1875
  if (taos == NULL || meta == NULL) {
5,452✔
UNCOV
1876
    uError("invalid parameter in %s", __func__);
×
1877
    return TSDB_CODE_INVALID_PARA;
×
1878
  }
1879
  SMqBatchMetaRsp rsp = {0};
5,452✔
1880
  SDecoder        coder = {0};
5,452✔
1881
  SDecoder        metaCoder = {0};
5,452✔
1882
  int32_t         code = TSDB_CODE_SUCCESS;
5,452✔
1883
  int32_t         lino = 0;
5,452✔
1884

1885
  RAW_LOG_START
5,452✔
1886
  // decode and process req
1887
  tDecoderInit(&coder, meta, metaLen);
5,452✔
1888
  RAW_RETURN_CHECK(tDecodeMqBatchMetaRsp(&coder, &rsp));
5,452✔
1889
  int32_t num = taosArrayGetSize(rsp.batchMetaReq);
5,452✔
1890
  uDebug("%s batch meta count:%d, metaLen:%d", __func__, num, metaLen);
5,452✔
1891
  for (int32_t i = 0; i < num; i++) {
52,676✔
1892
    int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
47,224✔
1893
    RAW_NULL_CHECK(len);
47,224✔
1894
    void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
47,224✔
1895
    RAW_NULL_CHECK(tmpBuf);
47,224✔
1896
    SMqMetaRsp metaRsp = {0};
47,224✔
1897
    tDecoderInit(&metaCoder, POINTER_SHIFT(tmpBuf, sizeof(SMqRspHead)), *len - sizeof(SMqRspHead));
47,224✔
1898
    RAW_RETURN_CHECK(tDecodeMqMetaRsp(&metaCoder, &metaRsp));
47,224✔
1899
    uDebug("%s processing batch item %d/%d, resMsgType:%d, metaRspLen:%d", __func__, i, num,
47,224✔
1900
           metaRsp.resMsgType, metaRsp.metaRspLen);
1901
    code = writeRawImpl(taos, metaRsp.metaRsp, metaRsp.metaRspLen, metaRsp.resMsgType);
47,224✔
1902
    tDeleteMqMetaRsp(&metaRsp);
47,224✔
1903
    tDecoderClear(&metaCoder);
47,224✔
1904
    if (code != TSDB_CODE_SUCCESS) {
47,224✔
UNCOV
1905
      uError("%s batch item %d/%d failed, resMsgType:%d, code:%s", __func__, i, num,
×
1906
             metaRsp.resMsgType, tstrerror(code));
UNCOV
1907
      goto end;
×
1908
    }
1909
  }
1910

1911
end:
5,452✔
1912
  tDecoderClear(&coder);
5,452✔
1913
  tDecoderClear(&metaCoder);
5,452✔
1914
  tDeleteMqBatchMetaRsp(&rsp);
5,452✔
1915
  RAW_LOG_END
5,452✔
1916
  return code;
5,452✔
1917
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc