Skip to content
This repository was archived by the owner on Dec 1, 2022. It is now read-only.

Commit 5713b46

Browse files
lionel.liu@vesoft.companda-sheepcritical27yixinglu
authored
toss (#430)
* addEdge only send to out side * add atomic_edge syntax for create space * add toss logic for upateEdge * root_ = outNode; * InsertExecutor sent if use toss * change syntax of atomic_edge from int to bool * remove unused status * set spaceDesc_.isolation_level directly will not set spaceDesc_.__isset.isolation_level. Co-authored-by: panda-sheep <59197347+panda-sheep@users.noreply.github.com> Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com> Co-authored-by: Yee <2520865+yixinglu@users.noreply.github.com>
1 parent 9bf832b commit 5713b46

File tree

8 files changed

+95
-29
lines changed

8 files changed

+95
-29
lines changed

src/executor/mutate/InsertExecutor.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@ folly::Future<Status> InsertEdgesExecutor::insertEdges() {
4949
return qctx()->getStorageClient()->addEdges(ieNode->getSpace(),
5050
ieNode->getEdges(),
5151
ieNode->getPropNames(),
52-
ieNode->getOverwritable())
52+
ieNode->getOverwritable(),
53+
nullptr,
54+
ieNode->useChainInsert())
5355
.via(runner())
5456
.ensure([addEdgeTime]() {
5557
VLOG(1) << "Add edge time: " << addEdgeTime.elapsedInUSec() << "us";

src/parser/AdminSentences.h

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,8 @@ class SpaceOptItem final {
153153
REPLICA_FACTOR,
154154
VID_TYPE,
155155
CHARSET,
156-
COLLATE
156+
COLLATE,
157+
ATOMIC_EDGE
157158
};
158159

159160
SpaceOptItem(OptionType op, std::string val) {
@@ -171,6 +172,11 @@ class SpaceOptItem final {
171172
optValue_ = std::move(val);
172173
}
173174

175+
SpaceOptItem(OptionType op, bool val) {
176+
optType_ = op;
177+
optValue_ = val ? 1 : 0;
178+
}
179+
174180
int64_t asInt() const {
175181
return boost::get<int64_t>(optValue_);
176182
}
@@ -246,6 +252,15 @@ class SpaceOptItem final {
246252
return optType_;
247253
}
248254

255+
int64_t getAtomicEdge() const {
256+
if (isInt()) {
257+
return asInt();
258+
} else {
259+
LOG(ERROR) << "atomic_edge value illegal.";
260+
return 0;
261+
}
262+
}
263+
249264
std::string toString() const;
250265

251266
private:

src/parser/parser.yy

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ static constexpr size_t MAX_ABS_INTEGER = 9223372036854775808ULL;
149149
%token KW_TAG KW_TAGS KW_UNION KW_INTERSECT KW_MINUS
150150
%token KW_NO KW_OVERWRITE KW_IN KW_DESCRIBE KW_DESC KW_SHOW KW_HOST KW_HOSTS KW_PART KW_PARTS KW_ADD
151151
%token KW_PARTITION_NUM KW_REPLICA_FACTOR KW_CHARSET KW_COLLATE KW_COLLATION KW_VID_TYPE
152+
%token KW_ATOMIC_EDGE
152153
%token KW_DROP KW_REMOVE KW_SPACES KW_INGEST KW_INDEX KW_INDEXES
153154
%token KW_IF KW_NOT KW_EXISTS KW_WITH
154155
%token KW_COUNT KW_COUNT_DISTINCT KW_SUM KW_AVG KW_MAX KW_MIN KW_STD KW_BIT_AND KW_BIT_OR KW_BIT_XOR
@@ -436,6 +437,7 @@ unreserved_keyword
436437
| KW_CHARSET { $$ = new std::string("charset"); }
437438
| KW_COLLATE { $$ = new std::string("collate"); }
438439
| KW_COLLATION { $$ = new std::string("collation"); }
440+
| KW_ATOMIC_EDGE { $$ = new std::string("atomic_edge"); }
439441
| KW_TTL_DURATION { $$ = new std::string("ttl_duration"); }
440442
| KW_TTL_COL { $$ = new std::string("ttl_col"); }
441443
| KW_SNAPSHOT { $$ = new std::string("snapshot"); }
@@ -2771,6 +2773,9 @@ space_opt_item
27712773
$$ = new SpaceOptItem(SpaceOptItem::VID_TYPE, *$3);
27722774
delete $3;
27732775
}
2776+
| KW_ATOMIC_EDGE ASSIGN BOOL {
2777+
$$ = new SpaceOptItem(SpaceOptItem::ATOMIC_EDGE, $3);
2778+
}
27742779
// TODO(YT) Create Spaces for different engines
27752780
// KW_ENGINE_TYPE ASSIGN name_label
27762781
;

src/parser/scanner.lex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ IP_OCTET ([0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5])
164164
"CHARSET" { return TokenType::KW_CHARSET; }
165165
"COLLATE" { return TokenType::KW_COLLATE; }
166166
"COLLATION" { return TokenType::KW_COLLATION; }
167+
"ATOMIC_EDGE" { return TokenType::KW_ATOMIC_EDGE; }
167168
"ALL" { return TokenType::KW_ALL; }
168169
"LEADER" { return TokenType::KW_LEADER; }
169170
"UUID" { return TokenType::KW_UUID; }

src/parser/test/ParserTest.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,20 @@ TEST(Parser, SpaceOperation) {
227227
auto result = parser.parse(query);
228228
ASSERT_TRUE(result.ok()) << result.status();
229229
}
230+
{
231+
GQLParser parser;
232+
std::string query = "CREATE SPACE default_space(partition_num=9, replica_factor=3,"
233+
"atomic_edge=true)";
234+
auto result = parser.parse(query);
235+
EXPECT_TRUE(result.ok()) << result.status();
236+
}
237+
{
238+
GQLParser parser;
239+
std::string query = "CREATE SPACE default_space(partition_num=9, replica_factor=3,"
240+
"atomic_edge=FALSE)";
241+
auto result = parser.parse(query);
242+
EXPECT_TRUE(result.ok()) << result.status();
243+
}
230244
}
231245

232246
TEST(Parser, TagOperation) {

src/planner/Mutate.h

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,13 +78,15 @@ class InsertEdges final : public SingleInputNode {
7878
GraphSpaceID spaceId,
7979
std::vector<storage::cpp2::NewEdge> edges,
8080
std::vector<std::string> propNames,
81-
bool overwritable) {
81+
bool overwritable,
82+
bool useChainInsert = false) {
8283
return qctx->objPool()->add(new InsertEdges(qctx,
8384
input,
8485
spaceId,
8586
std::move(edges),
8687
std::move(propNames),
87-
overwritable));
88+
overwritable,
89+
useChainInsert));
8890
}
8991

9092
std::unique_ptr<PlanNodeDescription> explain() const override;
@@ -105,24 +107,33 @@ class InsertEdges final : public SingleInputNode {
105107
return spaceId_;
106108
}
107109

110+
bool useChainInsert() const {
111+
return useChainInsert_;
112+
}
113+
108114
private:
109115
InsertEdges(QueryContext* qctx,
110116
PlanNode* input,
111117
GraphSpaceID spaceId,
112118
std::vector<storage::cpp2::NewEdge> edges,
113119
std::vector<std::string> propNames,
114-
bool overwritable)
120+
bool overwritable,
121+
bool useChainInsert)
115122
: SingleInputNode(qctx, Kind::kInsertEdges, input),
116123
spaceId_(spaceId),
117124
edges_(std::move(edges)),
118125
propNames_(std::move(propNames)),
119-
overwritable_(overwritable) {}
126+
overwritable_(overwritable),
127+
useChainInsert_(useChainInsert) {}
120128

121129
private:
122130
GraphSpaceID spaceId_{-1};
123131
std::vector<storage::cpp2::NewEdge> edges_;
124132
std::vector<std::string> propNames_;
125133
bool overwritable_;
134+
// if this enabled, add edge request will only sent to
135+
// outbound edges. (toss)
136+
bool useChainInsert_{false};
126137
};
127138

128139
class Update : public SingleInputNode {

src/validator/AdminValidator.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,13 @@ Status CreateSpaceValidator::validateImpl() {
8282
spaceDesc_.collate_name = std::move(result);
8383
break;
8484
}
85+
case SpaceOptItem::ATOMIC_EDGE: {
86+
if (item->getAtomicEdge()) {
87+
spaceDesc_.set_isolation_level(meta::cpp2::IsolationLevel::TOSS);
88+
} else {
89+
spaceDesc_.set_isolation_level(meta::cpp2::IsolationLevel::DEFAULT);
90+
}
91+
}
8592
}
8693
}
8794

src/validator/MutateValidator.cpp

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -147,12 +147,14 @@ Status InsertEdgesValidator::validateImpl() {
147147
}
148148

149149
Status InsertEdgesValidator::toPlan() {
150+
auto useChainInsert = space_.spaceDesc.isolation_level == meta::cpp2::IsolationLevel::TOSS;
150151
auto doNode = InsertEdges::make(qctx_,
151152
nullptr,
152153
spaceId_,
153154
std::move(edges_),
154155
std::move(propNames_),
155-
overwritable_);
156+
overwritable_,
157+
useChainInsert);
156158
root_ = doNode;
157159
tail_ = root_;
158160
return Status::OK();
@@ -186,7 +188,9 @@ Status InsertEdgesValidator::check() {
186188
}
187189

188190
Status InsertEdgesValidator::prepareEdges() {
189-
edges_.reserve(rows_.size()*2);
191+
auto useToss = space_.spaceDesc.isolation_level == meta::cpp2::IsolationLevel::TOSS;
192+
auto size = useToss ? rows_.size() : rows_.size() * 2;
193+
edges_.reserve(size);
190194
for (auto i = 0u; i < rows_.size(); i++) {
191195
auto *row = rows_[i];
192196
if (propNames_.size() != row->values().size()) {
@@ -236,11 +240,13 @@ Status InsertEdgesValidator::prepareEdges() {
236240
edge.__isset.props = true;
237241
edges_.emplace_back(edge);
238242

239-
// inbound
240-
edge.key.set_src(dstId);
241-
edge.key.set_dst(srcId);
242-
edge.key.set_edge_type(-edgeType_);
243-
edges_.emplace_back(std::move(edge));
243+
if (!useToss) {
244+
// inbound
245+
edge.key.set_src(dstId);
246+
edge.key.set_dst(srcId);
247+
edge.key.set_edge_type(-edgeType_);
248+
edges_.emplace_back(std::move(edge));
249+
}
244250
}
245251

246252
return Status::OK();
@@ -664,22 +670,27 @@ Status UpdateEdgeValidator::toPlan() {
664670
{},
665671
condition_,
666672
{});
667-
668-
auto *inNode = UpdateEdge::make(qctx_,
669-
outNode,
670-
spaceId_,
671-
std::move(name_),
672-
std::move(dstId_),
673-
std::move(srcId_),
674-
-edgeType_,
675-
rank_,
676-
insertable_,
677-
std::move(updatedProps_),
678-
std::move(returnProps_),
679-
std::move(condition_),
680-
std::move(yieldColNames_));
681-
root_ = inNode;
682-
tail_ = outNode;
673+
auto useToss = space_.spaceDesc.isolation_level == meta::cpp2::IsolationLevel::TOSS;
674+
if (useToss) {
675+
root_ = outNode;
676+
tail_ = root_;
677+
} else {
678+
auto *inNode = UpdateEdge::make(qctx_,
679+
outNode,
680+
spaceId_,
681+
std::move(name_),
682+
std::move(dstId_),
683+
std::move(srcId_),
684+
-edgeType_,
685+
rank_,
686+
insertable_,
687+
std::move(updatedProps_),
688+
std::move(returnProps_),
689+
std::move(condition_),
690+
std::move(yieldColNames_));
691+
root_ = inNode;
692+
tail_ = outNode;
693+
}
683694
return Status::OK();
684695
}
685696

0 commit comments

Comments
 (0)