Skip to content

Commit

Permalink
Merge branch 'release/s110' of https://github.com/taosdata/TDengine i…
Browse files Browse the repository at this point in the history
…nto release/s110
  • Loading branch information
plum-lihui committed Dec 26, 2020
2 parents 19ac151 + ca56f33 commit 55ceaad
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 12 deletions.
2 changes: 1 addition & 1 deletion packaging/tools/set_core.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ GREEN_DARK='\033[0;32m'
GREEN_UNDERLINE='\033[4;32m'
NC='\033[0m'

set -e
# set -e
# set -x
corePath=$1

Expand Down
1 change: 0 additions & 1 deletion src/client/src/tscFunctionImpl.c
Original file line number Diff line number Diff line change
Expand Up @@ -2625,7 +2625,6 @@ static bool apercentile_function_setup(SQLFunctionCtx *pCtx) {

char *tmp = (char *)pInfo + sizeof(SAPercentileInfo);
pInfo->pHisto = tHistogramCreateFrom(tmp, MAX_HISTOGRAM_BIN);
printf("%p, %p\n", pInfo->pHisto, pInfo->pHisto->elems);
return true;
}

Expand Down
96 changes: 87 additions & 9 deletions src/kit/taosdemo/taosdemo.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ typedef struct DemoArguments {
{0, 'P', "password", 0, "The password to use when connecting to the server. Default is 'taosdata'.", 3},
#endif
{0, 'd', "database", 0, "Destination database. Default is 'test'.", 3},
{0, 'a', "replica", 0, "Set the replica parameters of the database, Default 1, min: 1, max: 3.", 3},
{0, 'a', "replica", 0, "Set the replica parameters of the database, Default 1, min: 1, max: 3.", 3},
{0, 'm', "table_prefix", 0, "Table prefix name. Default is 't'.", 3},
{0, 's', "sql file", 0, "The select sql file.", 3},
{0, 'M', 0, 0, "Use metric flag.", 13},
Expand Down Expand Up @@ -205,10 +205,10 @@ typedef struct DemoArguments {
arguments->tb_prefix = arg;
break;
case 'M':
arguments->use_metric = false;
arguments->use_metric = true;
break;
case 'x':
arguments->insert_only = false;
arguments->insert_only = true;
break;
case 'c':
if (wordexp(arg, &full_path, 0) != 0) {
Expand Down Expand Up @@ -406,9 +406,9 @@ typedef struct DemoArguments {
} else if (strcmp(argv[i], "-m") == 0) {
arguments->tb_prefix = argv[++i];
} else if (strcmp(argv[i], "-M") == 0) {
arguments->use_metric = false;
arguments->use_metric = true;
} else if (strcmp(argv[i], "-x") == 0) {
arguments->insert_only = false;
arguments->insert_only = true;
} else if (strcmp(argv[i], "-c") == 0) {
strcpy(configDir, argv[++i]);
} else if (strcmp(argv[i], "-O") == 0) {
Expand Down Expand Up @@ -476,6 +476,14 @@ typedef struct {
int notFinished;
tsem_t lock_sem;
int counter;

// insert delay statitics
int64_t cntDelay;
int64_t totalDelay;
int64_t avgDelay;
int64_t maxDelay;
int64_t minDelay;

} info;

typedef struct {
Expand Down Expand Up @@ -575,7 +583,7 @@ int main(int argc, char *argv[]) {
arguments.num_of_DPT = 100000;
arguments.num_of_RPR = 1000;
arguments.use_metric = true;
arguments.insert_only = true;
arguments.insert_only = false;
// end change

parse_args(argc, argv, &arguments);
Expand Down Expand Up @@ -739,6 +747,9 @@ int main(int argc, char *argv[]) {
printf("Inserting data......\n");
pthread_t *pids = malloc(threads * sizeof(pthread_t));
info *infos = malloc(threads * sizeof(info));

memset(pids, 0, threads * sizeof(pthread_t));
memset(infos, 0, threads * sizeof(info));

int a = ntables / threads;
if (a < 1) {
Expand Down Expand Up @@ -768,6 +779,7 @@ int main(int argc, char *argv[]) {
t_info->end_table_id = i < b ? last + a : last + a - 1;
last = t_info->end_table_id + 1;
t_info->counter = 0;
t_info->minDelay = INT16_MAX;

tsem_init(&(t_info->mutex_sem), 0, 1);
t_info->notFinished = t_info->end_table_id - t_info->start_table_id + 1;
Expand Down Expand Up @@ -799,12 +811,29 @@ int main(int argc, char *argv[]) {
t, (int64_t)ntables * nrecords_per_table, nrecords_per_request,
(int64_t)ntables * nrecords_per_table / t);

int64_t totalDelay = 0;
int64_t maxDelay = 0;
int64_t minDelay = INT16_MAX;
int64_t cntDelay = 0;
double avgDelay = 0;
for (int i = 0; i < threads; i++) {
info *t_info = infos + i;
taos_close(t_info->taos);
tsem_destroy(&(t_info->mutex_sem));
tsem_destroy(&(t_info->lock_sem));

totalDelay += t_info->totalDelay;
cntDelay += t_info->cntDelay;
if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay;
if (t_info->minDelay < minDelay) minDelay = t_info->minDelay;
}
avgDelay = (double)totalDelay / cntDelay;

fprintf(fp, "insert delay, avg:%10.6fms, max: %10.6fms, min: %10.6fms\n\n",
avgDelay/1000.0, (double)maxDelay/1000.0, (double)minDelay/1000.0);

printf("insert delay, avg: %10.6fms, max: %10.6fms, min: %10.6fms\n\n",
avgDelay/1000.0, (double)maxDelay/1000.0, (double)minDelay/1000.0);

free(pids);
free(infos);
Expand Down Expand Up @@ -859,7 +888,7 @@ int main(int argc, char *argv[]) {
}


if (!insert_only) {
if (false == insert_only) {
// query data
pthread_t read_id;
info *rInfo = malloc(sizeof(info));
Expand Down Expand Up @@ -998,7 +1027,7 @@ void * createTable(void *sarg)
/* Create all the tables; */
printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id);
for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) {
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d (ts timestamp%s;", winfo->db_name, winfo->tb_prefix, i, winfo->cols);
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d (ts timestamp%s);", winfo->db_name, winfo->tb_prefix, i, winfo->cols);
queryDB(winfo->taos, command);
}
} else {
Expand Down Expand Up @@ -1204,6 +1233,41 @@ void *readMetric(void *sarg) {
return NULL;
}

static int queryDbExec(TAOS *taos, char *command, int type) {
int i;
TAOS_RES *res = NULL;
int32_t code = -1;

for (i = 0; i < 5; i++) {
if (NULL != res) {
taos_free_result(res);
res = NULL;
}

res = taos_query(taos, command);
code = taos_errno(res);
if (0 == code) {
break;
}
}

if (code != 0) {
fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(res));
taos_free_result(res);
//taos_close(taos);
return -1;
}

if (1 == type) {
int affectedRows = taos_affected_rows(res);
taos_free_result(res);
return affectedRows;
}

taos_free_result(res);
return 0;
}

void queryDB(TAOS *taos, char *command) {
int i;
TAOS_RES *pSql = NULL;
Expand Down Expand Up @@ -1273,7 +1337,21 @@ void *syncWrite(void *sarg) {
}

/* puts(buffer); */
queryDB(winfo->taos, buffer);
int64_t startTs;
int64_t endTs;
startTs = taosGetTimestampUs();
//queryDB(winfo->taos, buffer);
int affectedRows = queryDbExec(winfo->taos, buffer, 1);

if (0 <= affectedRows){
endTs = taosGetTimestampUs();
int64_t delay = endTs - startTs;
if (delay > winfo->maxDelay) winfo->maxDelay = delay;
if (delay < winfo->minDelay) winfo->minDelay = delay;
winfo->cntDelay++;
winfo->totalDelay += delay;
//winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay;
}

if (tID == winfo->end_table_id) {
i = inserted;
Expand Down
2 changes: 1 addition & 1 deletion src/query/src/qResultbuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32

// allocate buf
if (availablePage == NULL) {
pi->pData = calloc(1, pResultBuf->pageSize + POINTER_BYTES);
pi->pData = calloc(1, pResultBuf->pageSize + POINTER_BYTES + 2); // add extract bytes in case of zipped buffer increased.
} else {
pi->pData = availablePage;
}
Expand Down

0 comments on commit 55ceaad

Please sign in to comment.