Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Archived/Feat/td 28622 3.0 #25520

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
67d57c3
refact: adjust some type names in sync
bgzhao66 Mar 8, 2024
1b42058
refact: improve code of serialization of tsdb snap info
bgzhao66 Mar 12, 2024
b57f8c1
feat: add oid and blob fields in SSubmitReq2
bgzhao66 Mar 19, 2024
138cf3e
feat: add func createBlobDataType for parser
bgzhao66 Mar 21, 2024
16fe58d
feat: create blob data type in sql.y
bgzhao66 Mar 21, 2024
1225665
feat: parse blob data types
bgzhao66 Mar 21, 2024
129c91e
enh: insert data of basic blob
bgzhao66 Mar 27, 2024
44ced4e
refact: change a typo of invalidFuncPara...
bgzhao66 Mar 27, 2024
c129adf
feat: support length for blob data type
bgzhao66 Mar 27, 2024
897bff7
enh: add STATIC_ASSERT for compile time assertion
bgzhao66 Apr 3, 2024
a1a0c41
feat: add library blob
bgzhao66 Apr 3, 2024
a7dda9f
feat: add include/libs/blob/blob.h and source/libs/blob.c
bgzhao66 Apr 3, 2024
3b85ca0
enh: refactor func taosHex2Ascii
bgzhao66 Apr 7, 2024
f81407a
refact: rename tEncodeSubmitReq to tEncodeSubmitTbDataReq
bgzhao66 Apr 9, 2024
e251eca
enh: check version of SSubmitReq2Msg encoding
bgzhao66 Apr 9, 2024
c20554a
enh: unify sync msgs of client request, raft entry and append entries…
bgzhao66 Apr 11, 2024
8426550
enh: build sync client request uniformly
bgzhao66 Apr 11, 2024
a323b95
enh: execute config change within syncFsmExecute
bgzhao66 Apr 11, 2024
ca0c7f7
feat: add blob test CMakeLists.txt
bgzhao66 Apr 18, 2024
cbaba37
feat: encode and decode blob data in submitReq
bgzhao66 Apr 18, 2024
3f43ddc
enh: ensure capacity of blob chunks
bgzhao66 Apr 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/client/taos.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ typedef void TAOS_SUB;
#define TSDB_DATA_TYPE_JSON 15 // json string
#define TSDB_DATA_TYPE_VARBINARY 16 // binary
#define TSDB_DATA_TYPE_DECIMAL 17 // decimal
#define TSDB_DATA_TYPE_BLOB 18 // binary
#define TSDB_DATA_TYPE_BLOB 18 // blob
#define TSDB_DATA_TYPE_MEDIUMBLOB 19
#define TSDB_DATA_TYPE_BINARY TSDB_DATA_TYPE_VARCHAR // string
#define TSDB_DATA_TYPE_GEOMETRY 20 // geometry
Expand Down
2 changes: 1 addition & 1 deletion include/common/tdataformat.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ int32_t tValueColumnCompressInfoDecode(SBufferReader *reader, SValueColumnCompre
int32_t tValueCompare(const SValue *tv1, const SValue *tv2);

// SRow ================================
int32_t tRowBuild(SArray *aColVal, const STSchema *pTSchema, SRow **ppRow);
int32_t tRowBuild(SArray *aColVal, const STSchema *pTSchema, SRow **ppRow, SArray *bOffsets);
int32_t tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
void tRowDestroy(SRow *pRow);
int32_t tRowSort(SArray *aRowP);
Expand Down
42 changes: 40 additions & 2 deletions include/common/tmsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,12 @@ typedef struct {
int32_t vgId;
} SMsgHead;

typedef struct STlv {
int16_t type;
int32_t len;
char value[];
} STlv;

// Submit message for one table
typedef struct SSubmitBlk {
int64_t uid; // table unique id
Expand Down Expand Up @@ -4183,6 +4189,12 @@ int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);

#define TD_REQ_FROM_TAOX_OLD 0x1 // for compatibility

typedef enum {
E_TLV_SUBMIT_TB_DATA = 0,
E_TLV_SUBMIT_OID_DATA = 1,
E_TLV_SUBMIT_BLOB_DATA = 2,
} ESubmitReqTlvFlag;

typedef struct {
int32_t flags;
SVCreateTbReq* pCreateTbReq;
Expand All @@ -4196,8 +4208,18 @@ typedef struct {
int64_t ctimeMs;
} SSubmitTbData;

typedef struct {
SArray* aOids; // SArray<SBlobOidInfo>
} SSubmitOidData;

typedef struct {
SArray* aBlobs; // SArray<SBlobData*>
} SSubmitBlobData;

typedef struct {
SArray* aSubmitTbData; // SArray<SSubmitTbData>
SArray* aSubmitOidData; // SArray<SSubmitOidData>
SArray* aSubmitBlobData; // SArray<SSubmitBlobData>
} SSubmitReq2;

typedef struct {
Expand All @@ -4206,9 +4228,25 @@ typedef struct {
char data[]; // SSubmitReq2
} SSubmitReq2Msg;

int32_t tEncodeSubmitReq(SEncoder* pCoder, const SSubmitReq2* pReq);
int32_t tDecodeSubmitReq(SDecoder* pCoder, SSubmitReq2* pReq);
int32_t tEncodeSubmitReq2Msg(int32_t vgId, const SSubmitReq2* pReq, void** pData, uint32_t* pLen);
int32_t tDecodeSubmitReq2(void* pData, uint32_t len, int64_t version, SSubmitReq2* pReq);

int32_t tEncodeSubmitTbDataReq(SEncoder* pCoder, const SSubmitReq2* pReq);
int32_t tDecodeSubmitTbDataReq(SDecoder* pCoder, SSubmitReq2* pReq);

int32_t tSubmitReq2MsgVersion(const SSubmitReq2* pReq, bool* hasExOid, bool* hasExBlob);
static bool tSubmitReq2MsgWithTlv(int32_t version) { return version > 1; }

int32_t tEncodeSubmitOidDataReq(SEncoder* pCoder, const SSubmitReq2* pReq);
int32_t tDecodeSubmitOidDataReq(SDecoder* pCoder, SSubmitReq2* pReq);

int32_t tEncodeSubmitBlobDataReq(SEncoder* pCoder, const SSubmitReq2* pReq);
int32_t tDecodeSubmitBlobDataReq(SDecoder* pCoder, SSubmitReq2* pReq);

void tDestroySubmitTbData(SSubmitTbData* pTbData, int32_t flag);
void tDestroySubmitBlobData(void* ptr);
void tDestroySubmitOidData(void* ptr);

void tDestroySubmitReq(SSubmitReq2* pReq, int32_t flag);

typedef struct {
Expand Down
9 changes: 6 additions & 3 deletions include/common/ttypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ typedef int8_t col_type_t;
typedef int32_t col_bytes_t;
typedef int32_t schema_ver_t;
typedef int32_t func_id_t;
typedef int64_t TdOidT;

#pragma pack(push, 1)
typedef struct {
Expand Down Expand Up @@ -270,9 +271,11 @@ typedef struct {

#define IS_VAR_DATA_TYPE(t) \
(((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_VARBINARY) || ((t) == TSDB_DATA_TYPE_NCHAR) || \
((t) == TSDB_DATA_TYPE_JSON) || ((t) == TSDB_DATA_TYPE_GEOMETRY))
#define IS_STR_DATA_TYPE(t) \
(((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_VARBINARY) || ((t) == TSDB_DATA_TYPE_NCHAR))
((t) == TSDB_DATA_TYPE_JSON) || ((t) == TSDB_DATA_TYPE_GEOMETRY) || ((t) == TSDB_DATA_TYPE_BLOB))
#define IS_STR_DATA_TYPE(t) \
(((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_VARBINARY) || ((t) == TSDB_DATA_TYPE_NCHAR) || \
((t) == TSDB_DATA_TYPE_BLOB))
#define IS_BLOB_DATA_TYPE(t) ((t) == TSDB_DATA_TYPE_BLOB)

#define IS_VALID_TINYINT(_t) ((_t) >= INT8_MIN && (_t) <= INT8_MAX)
#define IS_VALID_SMALLINT(_t) ((_t) >= INT16_MIN && (_t) <= INT16_MAX)
Expand Down
131 changes: 131 additions & 0 deletions include/libs/blob/blob.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_BLOB_H_
#define _TD_BLOB_H_

#include "tarray.h"
#include "tdataformat.h"
#include "tmsg.h"

#ifdef __cplusplus
extern "C" {
#endif

#define BLOB_CHUNK_LVL 11

enum EBloblEntryFlag {
EBLOB_FLAG_DATA = 1,
EBLOB_FLAG_OID = 2,
};

#define BLOB_OID_INVALID -1

#pragma pack(push, 1)
typedef struct SBlobEntry {
int8_t flag;
union {
struct {
uint32_t len;
char data[];
} varData;
TdOidT oid;
};
} SBlobEntry;

typedef struct SBlobChunkOffset {
uint32_t offset;
uint16_t length;
} SBlobChunkOffset;

typedef struct SBlobOidOffset {
int32_t idx;
int32_t offset;
} SBlobOidOffset;

typedef struct SBlobDataHdr {
union {
TdOidT v;
SBlobOidOffset i;
} oid;
int64_t version;
uint32_t length;
uint8_t cmprAlg;
uint8_t chunkLvl;
uint16_t reserved;
uint32_t bodyLen;
} SBlobDataHdr;
#pragma pack(pop)

typedef struct SBlobOidInfo {
TSKEY ts;
TdOidT oid;
SBlobOidOffset info;
} SBlobOidInfo;

typedef struct SBlobChunk {
SBlobChunkOffset info;
char *pData;
} SBlobChunk;

typedef struct SBlobData {
TSKEY ts;
SBlobDataHdr hdr;
SArray *pChunks; // SArray<SBlobChunk>
} SBlobData;

// blob entry
int32_t tBlobEntrySize(SBlobEntry *pEntry);

// blob chunk
static int32_t tBlobChunkSize(uint8_t chunkLvl) { return (1 << chunkLvl); }
static int32_t tBlobChunkNum(int32_t length, int32_t chunkSize) {
return (length / chunkSize) + (length % chunkSize != 0);
}

int32_t blCreateBlobChunk(SBlobChunk *pChunk, uint32_t offset, uint32_t length);
void blDestroyBlobChunk(void *pChunk);

int32_t blChopDataIntoChunks(SBlobData *pBlob, const void *pData, int32_t length);
int32_t blJoinChunksIntoData(SBlobData *pBlob, void *pData, int32_t length);

// blob data
int32_t blReWriteBlobData(SColVal *pColVal, TSKEY ts, SBlobData **ppBlob);
int32_t blSeparateBlobData(const SBlobEntry *pEntry, SBlobData **ppBlob);

int32_t tPutSubmitBlobData(uint8_t *p, const SSubmitBlobData *pSubmitBlobData);
int32_t tGetSubmitBlobData(uint8_t *p, SSubmitBlobData *pSubmitBlobData);

int32_t tGetSubmitBlobDataArr(uint8_t *p, SArray *aSubmitBlobData);
int32_t tPutSubmitBlobDataArr(uint8_t *p, const SArray *aSubmitBlobData);

int32_t tEncodeSubmitOidData(SEncoder *pCoder, const SSubmitOidData *pSubmitOidData);
int32_t tDecodeSubmitOidData(SDecoder *pCoder, SSubmitOidData *pSubmitOidData);

SBlobData *blCreateBlobData();
void blFreeBlobDataImpl(void *ptr);

#define blFreeBlobData(pBlob) \
do { \
blFreeBlobDataImpl(pBlob); \
pBlob = NULL; \
} while (0)

int32_t blApplySubmitOidData(SSubmitReq2 *pReq);

#ifdef __cplusplus
}
#endif

#endif /*_TD_BLOB_H_*/
9 changes: 6 additions & 3 deletions include/libs/function/taosudf.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,12 @@ typedef uint16_t VarDataLenT; // maxVarDataLen: 65535
#define varDataCopy(dst, v) memcpy((dst), (void *)(v), varDataTLen(v))
#define varDataLenByData(v) (*(VarDataLenT *)(((char *)(v)) - VARSTR_HEADER_SIZE))
#define varDataSetLen(v, _len) (((VarDataLenT *)(v))[0] = (VarDataLenT)(_len))
#define IS_VAR_DATA_TYPE(t) \
(((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_VARBINARY) || ((t) == TSDB_DATA_TYPE_NCHAR) || ((t) == TSDB_DATA_TYPE_JSON) || ((t) == TSDB_DATA_TYPE_GEOMETRY))
#define IS_STR_DATA_TYPE(t) (((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_VARBINARY) || ((t) == TSDB_DATA_TYPE_NCHAR))
#define IS_VAR_DATA_TYPE(t) \
(((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_VARBINARY) || ((t) == TSDB_DATA_TYPE_NCHAR) || \
((t) == TSDB_DATA_TYPE_JSON) || ((t) == TSDB_DATA_TYPE_GEOMETRY) || ((t) == TSDB_DATA_TYPE_BLOB))
#define IS_STR_DATA_TYPE(t) \
(((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_VARBINARY) || ((t) == TSDB_DATA_TYPE_NCHAR) || \
((t) == TSDB_DATA_TYPE_BLOB))

static FORCE_INLINE char *udfColDataGetData(const SUdfColumn *pColumn, int32_t row) {
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
Expand Down
4 changes: 3 additions & 1 deletion include/libs/qcom/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,13 @@ typedef struct STableDataCxt {
STableMeta* pMeta;
STSchema* pSchema;
SBoundColInfo boundColsInfo;
SArray* pValues;
SArray* pValues; // SArray<SColVal>
SSubmitTbData* pData;
SRowKey lastKey;
bool ordered;
bool duplicateTs;
SArray* pBlobs; // SArray<SBlobData*>
SArray* bOffsets; // SArray<int32_t>
} STableDataCxt;

typedef int32_t (*__async_send_cb_fn_t)(void* param, SDataBuf* pMsg, int32_t code);
Expand Down
12 changes: 6 additions & 6 deletions include/libs/sync/sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ typedef int32_t SyncGroupId;
typedef int64_t SyncIndex;
typedef int64_t SyncTerm;

typedef struct SSyncNode SSyncNode;
typedef struct SSyncNode SyncNode;
typedef struct SWal SWal;
typedef struct SSyncRaftEntry SSyncRaftEntry;
typedef struct SSyncRaftEntry SyncRaftEntry;

typedef enum {
TAOS_SYNC_STATE_OFFLINE = 0,
Expand Down Expand Up @@ -225,8 +225,8 @@ typedef struct SSyncLogStore {
SyncIndex (*syncLogIndexRetention)(struct SSyncLogStore* pLogStore, int64_t bytes);
SyncTerm (*syncLogLastTerm)(struct SSyncLogStore* pLogStore);

int32_t (*syncLogAppendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, bool forcSync);
int32_t (*syncLogGetEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry);
int32_t (*syncLogAppendEntry)(struct SSyncLogStore* pLogStore, SyncRaftEntry* pEntry, bool forcSync);
int32_t (*syncLogGetEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SyncRaftEntry** ppEntry);
int32_t (*syncLogTruncate)(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);

} SSyncLogStore;
Expand Down Expand Up @@ -286,8 +286,8 @@ bool syncIsReadyForRead(int64_t rid);
bool syncSnapshotSending(int64_t rid);
bool syncSnapshotRecving(int64_t rid);
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq);
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg);
int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg);
int32_t syncForceBecomeFollower(SyncNode* ths, const SRpcMsg* pRpcMsg);
int32_t syncBecomeAssignedLeader(SyncNode* ths, SRpcMsg* pRpcMsg);

int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm);

Expand Down
2 changes: 2 additions & 0 deletions include/os/osDef.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ void syslog(int unused, const char *format, ...);
#define POINTER_SHIFT(p, b) ((void *)((char *)(p) + (b)))
#define POINTER_DISTANCE(p1, p2) ((char *)(p1) - (char *)(p2))

#define container_of(ptr, type, member) ((type *)((char *)(ptr)-offsetof(type, member)))

#ifndef UNUSED
#define UNUSED(x) ((void)(x))
#endif
Expand Down
1 change: 1 addition & 0 deletions include/os/osString.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ uint8_t taosStr2UInt8(const char *str, char **pEnd, int32_t radix);
double taosStr2Double(const char *str, char **pEnd);
float taosStr2Float(const char *str, char **pEnd);
int32_t taosHex2Ascii(const char *z, uint32_t n, void** data, uint32_t* size);
int32_t taosHex2AsciiImpl(const char *z, uint32_t n, void* data, uint32_t size);
int32_t taosAscii2Hex(const char *z, uint32_t n, void** data, uint32_t* size);
//int32_t taosBin2Ascii(const char *z, uint32_t n, void** data, uint32_t* size);
bool isHex(const char* z, uint32_t n);
Expand Down
1 change: 1 addition & 0 deletions include/util/tdef.h
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ typedef enum ELogicConditionType {
#define TSDB_MAX_NCHAR_LEN TSDB_MAX_FIELD_LEN // 16384-8:65519
#define TSDB_MAX_GEOMETRY_LEN TSDB_MAX_FIELD_LEN // 16384-8:65519
#define TSDB_MAX_VARBINARY_LEN TSDB_MAX_FIELD_LEN // 16384-8:65519
#define TSDB_MAX_BLOB_LEN (20 * 1024 * 1024)

#define PRIMARYKEY_TIMESTAMP_COL_ID 1
#define COL_REACH_END(colId, maxColId) ((colId) > (maxColId))
Expand Down
6 changes: 6 additions & 0 deletions include/util/tlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ bool taosAssertRelease(bool condition);
#endif
#endif

#ifdef __GNUC__
#define STATIC_ASSERT(condition, msg) _Static_assert(condition, msg)
#else
#define STATIC_ASSERT(condition, msg)
#endif

void taosLogCrashInfo(char *nodeType, char *pMsg, int64_t msgLen, int signum, void *sigInfo);
void taosReadCrashInfo(char *filepath, char **pMsg, int64_t *pMsgLen, TdFilePtr *pFd);
void taosReleaseCrashLogFile(TdFilePtr pFile, bool truncateFile);
Expand Down
1 change: 1 addition & 0 deletions source/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ target_link_libraries(
common
PUBLIC os
PUBLIC util
PUBLIC blob
INTERFACE api
)

Expand Down
6 changes: 5 additions & 1 deletion source/common/src/tdatablock.c
Original file line number Diff line number Diff line change
Expand Up @@ -2228,6 +2228,10 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf,
len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf);
if (len >= size - 1) return dumpBuf;
} break;
default:
uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
ASSERT(0);
break;
}
}
len += snprintf(dumpBuf + len, size - len, "%d\n", j);
Expand Down Expand Up @@ -2377,7 +2381,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
}
}
SRow* pRow = NULL;
if ((terrno = tRowBuild(pVals, pTSchema, &pRow)) < 0) {
if ((terrno = tRowBuild(pVals, pTSchema, &pRow, NULL)) < 0) {
tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
goto _end;
}
Expand Down