Skip to content

Commit

Permalink
Merge pull request #22313 from taosdata/fix/addStreamVer
Browse files Browse the repository at this point in the history
add stream verion
  • Loading branch information
plum-lihui committed Aug 4, 2023
2 parents 40b87a2 + 10da0dd commit f19826f
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 141 deletions.
28 changes: 15 additions & 13 deletions include/libs/stream/tstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ extern "C" {

typedef struct SStreamTask SStreamTask;

#define SSTREAM_TASK_VER 1
enum {
STREAM_STATUS__NORMAL = 0,
STREAM_STATUS__STOP,
Expand Down Expand Up @@ -266,13 +267,13 @@ typedef struct SCheckpointInfo {
} SCheckpointInfo;

typedef struct SStreamStatus {
int8_t taskStatus;
int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set
int8_t schedStatus;
int8_t keepTaskStatus;
bool transferState;
int8_t timerActive; // timer is active
int8_t pauseAllowed; // allowed task status to be set to be paused
int8_t taskStatus;
int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set
int8_t schedStatus;
int8_t keepTaskStatus;
bool transferState;
int8_t timerActive; // timer is active
int8_t pauseAllowed; // allowed task status to be set to be paused
} SStreamStatus;

typedef struct SHistDataRange {
Expand Down Expand Up @@ -309,6 +310,7 @@ typedef struct {
} STaskTimestamp;

struct SStreamTask {
int64_t ver;
SStreamId id;
SSTaskBasicInfo info;
STaskOutputInfo outputInfo;
Expand Down Expand Up @@ -589,10 +591,10 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus);
bool streamTaskIsIdle(const SStreamTask* pTask);
int32_t streamTaskEndScanWAL(SStreamTask* pTask);

SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize);
SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize);

char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);

// recover and fill history
void streamTaskCheckDownstreamTasks(SStreamTask* pTask);
Expand Down Expand Up @@ -628,7 +630,8 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask);

// agg level
int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask);
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq *pReq, SRpcHandleInfo* pRpcInfo);
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq,
SRpcHandleInfo* pRpcInfo);
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask);

// stream task meta
Expand All @@ -642,7 +645,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded);
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);

Expand All @@ -659,7 +662,6 @@ int32_t streamTaskReleaseState(SStreamTask* pTask);
int32_t streamTaskReloadState(SStreamTask* pTask);
int32_t streamAlignTransferState(SStreamTask* pTask);


#ifdef __cplusplus
}
#endif
Expand Down

0 comments on commit f19826f

Please sign in to comment.