Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(snapshot): Enable snapshot streaming after bulkload #8938

Open
wants to merge 30 commits into
base: main
Choose a base branch
from

Conversation

all-seeing-code
Copy link
Contributor

@all-seeing-code all-seeing-code commented Aug 8, 2023

Description: Enable snapshot streaming after bulkload

Summary: Previously, the bulkloaded p directory couldn't stream to a new alpha due to a commitTs of zero. In this PR, the commitTs is sourced from the p directory, allowing the alpha to create a snapshot and subsequently stream it to another alpha.

Tests:

  • TestBulkLoaderSnapshotPDirinAlpha0: Load a p directory using bulkload. Then, initiate one alpha using the bulkloaded p directory and start a second alpha without any p directory. Query both alphas to ensure the snapshot has been successfully generated and that the data is accessible from both instances.
  • TestBulkLoaderSnapshotPDirinAll: Load a p directory using bulkload. Then, copy bulkloaded p directory in all alphas and start the cluster. Query all alphas to ensure that the data is accessible from all instances.
  • TestBulkLoaderDataLoss: Move the zero timestamp and load a p directory using bulkload. Then use this p directory on a fresh cluster (both zero and alpha are new). Validate that the query doesn't work without moving the timestamp of the new zero.

Closes: https://dgraph.atlassian.net/browse/DGRAPHCORE-214

Docs: NA

@dgraph-bot dgraph-bot added area/testing Testing related issues area/core internal mechanisms go Pull requests that update Go code labels Aug 8, 2023
worker/draft.go Outdated
Comment on lines 1857 to 1858
// Need a delay otherwise it interfers with starting of Raft loop
time.AfterFunc(1*time.Second, func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can i ask how it might interfere with the starting of Raft loop?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, and we should avoid it. This will turn into a bad idea, soon enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the node initializes, it begins in a follower state, even in a single alpha cluster scenario. This pause ensures that the leader election process occurs, allowing the node to acquire leadership. Without this pause, the node tries to capture a snapshot but fails the check in n.AmLeader(), resulting in no snapshot being taken.

Copy link
Contributor

@mangalaman93 mangalaman93 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need more tests as we discussed, a few questions and minor comments are dgraphtest package

.gitignore Outdated Show resolved Hide resolved
dgraphtest/dgraph.go Outdated Show resolved Hide resolved
dgraphtest/local_cluster.go Outdated Show resolved Hide resolved
dgraphtest/local_cluster.go Outdated Show resolved Hide resolved
@@ -626,6 +702,32 @@ func (c *LocalCluster) Client() (*GrpcClient, func(), error) {
return &GrpcClient{Dgraph: client}, cleanup, nil
}

// Client returns a grpc client that can talk to any Alpha in the cluster
func (c *LocalCluster) ClientForAlpha(id int) (*GrpcClient, func(), error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should define this function such that existing (*LocalCluster).Client function can use it too. Feels duplicated code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defined one for conn, clients are little high level to be able to use one for the other. Let me know if the modified piece is an ok abstraction.

worker/draft.go Outdated Show resolved Hide resolved
worker/draft.go Outdated Show resolved Hide resolved
worker/draft.go Outdated Show resolved Hide resolved
worker/draft.go Outdated Show resolved Hide resolved
worker/draft.go Outdated Show resolved Hide resolved
@all-seeing-code
Copy link
Contributor Author

We need more tests as we discussed, a few questions and minor comments are dgraphtest package

I have added more tests and extended the dgraphtest package to support some of the functions required for the new tests. This PR is ready for another round of reviews.

Copy link
Contributor

@mangalaman93 mangalaman93 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, we need to think about more test cases. A few minor comments and questions.

dgraphtest/cluster.go Outdated Show resolved Hide resolved
dgraphtest/cluster.go Outdated Show resolved Hide resolved
return fmt.Errorf("failed to assign state. error: %s", resp.Errors[0].Message)
}

if resp, err := parseAssignIdResponse(body); err == nil && resp.StartId != "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if there an error here?

@@ -92,13 +93,29 @@ type dnode interface {
zeroURL(*LocalCluster) (string, error)
}

type nodeType interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did we define a new interface? Can we not use the existing interface?

if err := os.MkdirAll(pDir, os.ModePerm); err != nil {
return nil, errors.Wrap(err, "erorr creating bulk dir")
}
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we checking for error here again?

@@ -493,3 +493,28 @@ func (c *LocalCluster) BulkLoad(opts BulkOpts) error {
return nil
}
}

func (c *LocalCluster) CopyBulkLoadDirsToAlphaMounts() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we doing the copy when we can just let the data stream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for the test when we want to copy p directories in each of the alphas in the cluster and start them. This is different from the case when we let the data stream from one alpha to all the other alpha (covered in: TestBulkLoaderSnapshotPDirinAlpha0)

@@ -183,6 +184,13 @@ func (cc ClusterConfig) WithBulkLoadOutDir(dir string) ClusterConfig {
return cc
}

// WithBulkLoadpDirIn sets the p dir for the alphas. This controls
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this option is not clear. Took me some time to make sense of it. We should try to come up with a better name and possible value.


// Start and query each alpha
for i := 0; i < 3; i++ {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove the newlines

@@ -1837,6 +1852,11 @@ func (n *node) InitAndStartNode() {
n.SetRaft(raft.StartNode(n.Cfg, peers))
// Trigger election, so this node can become the leader of this single-node cluster.
n.canCampaign = true
// Also trigger a snapshot so that this node can take a snapshot if required
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we loop on whether a leader is chosen instead?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/core internal mechanisms area/testing Testing related issues go Pull requests that update Go code
Development

Successfully merging this pull request may close these issues.

None yet

4 participants