Skip to content

Commit

Permalink
fix: avoid coercing all err to EOF, add comments trying to trace othe…
Browse files Browse the repository at this point in the history
…r locations of EOF that might be impacting how Next() states iterator.Done before end is reached, googleapis#2601
  • Loading branch information
crwilcox committed Jan 30, 2021
1 parent c7ecf0f commit df76a8d
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 1 deletion.
5 changes: 5 additions & 0 deletions firestore/query.go
Expand Up @@ -785,7 +785,12 @@ func (it *QuerySnapshotIterator) Next() (*QuerySnapshot, error) {
}
btree, changes, readTime, err := it.ws.nextSnapshot()
if err != nil {
// TODO: verify if stream was completed, or EOF is from an early terminations
// NOTE(crwilcox): possible thought, add var to detect if stream is completed, or err early?
// if err == io.EOF && it.ws.streamCompleted {
if err == io.EOF {
// NOTE(crwilcox): this is where we have iterator done

err = iterator.Done
}
it.err = err
Expand Down
12 changes: 11 additions & 1 deletion firestore/watch.go
Expand Up @@ -155,6 +155,7 @@ func (s *watchStream) less(a, b *DocumentSnapshot) bool {
// Once nextSnapshot returns an error, it will always return the same error.
func (s *watchStream) nextSnapshot() (*btree.BTree, []DocumentChange, time.Time, error) {
if s.err != nil {
// NOTE(crwilcox): If s.err is already :(, return to user. Can this be EOF?
return nil, nil, time.Time{}, s.err
}
var changes []DocumentChange
Expand All @@ -163,12 +164,14 @@ func (s *watchStream) nextSnapshot() (*btree.BTree, []DocumentChange, time.Time,
for !s.handleNextMessage() {
}
if s.err != nil {
// NOTE(crwilcox): handleNextMessage can be EOF err. It is considered permannt.
_ = s.close() // ignore error
return nil, nil, time.Time{}, s.err
}
var newDocTree *btree.BTree
newDocTree, changes = s.computeSnapshot(s.docTree, s.docMap, s.changeMap, s.readTime)
if s.err != nil {
// NOTE(crwilcox): technically this could be erring with EOF, but seems unlikely.
return nil, nil, time.Time{}, s.err
}
// Only return a snapshot if something has changed, or this is the first snapshot.
Expand All @@ -187,6 +190,7 @@ func (s *watchStream) nextSnapshot() (*btree.BTree, []DocumentChange, time.Time,
func (s *watchStream) handleNextMessage() bool {
res, err := s.recv()
if err != nil {
// NOTE(crwilcox): can this be EOF? If so, this could be bubbling up from caller
s.err = err
// Errors returned by recv are permanent.
return true
Expand Down Expand Up @@ -446,9 +450,12 @@ func (s *watchStream) stop() {
return
}
if err != nil {
// if an error occurs while closing the stream
s.err = err
} else {
// if we close successfully,
s.err = io.EOF // normal shutdown
}
s.err = io.EOF // normal shutdown
}

func (s *watchStream) close() error {
Expand All @@ -468,6 +475,7 @@ func (s *watchStream) recv() (*pb.ListenResponse, error) {
s.lc, err = s.open()
if err != nil {
// Do not retry if open fails.
// NOTE(crwilcox): is this EOF?
return nil, err
}
}
Expand Down Expand Up @@ -499,6 +507,8 @@ func (s *watchStream) open() (pb.Firestore_ListenClient, error) {
})
}
if err != nil {
// NOTE: it seems likely, that of the errs, this is raising an io.EOF, which
// Ultimately is shown to users.
return nil, err
}
return lc, nil
Expand Down

0 comments on commit df76a8d

Please sign in to comment.