Skip to content

Commit

Permalink
Merge pull request #2192 from dedis/paginate-blocks-service
Browse files Browse the repository at this point in the history
Adds a PaginateBlocks service to byzcoin
  • Loading branch information
Gaylor Bosson committed Feb 6, 2020
2 parents d9a2f99 + 710dfa9 commit 793b6b0
Show file tree
Hide file tree
Showing 9 changed files with 3,227 additions and 65 deletions.
32 changes: 32 additions & 0 deletions byzcoin/proto.go
Expand Up @@ -301,6 +301,38 @@ type StreamingResponse struct {
Block *skipchain.SkipBlock
}

// PaginateRequest is a request to get NumPages times the consecutive list of
// PageSize blocks.
type PaginateRequest struct {
// The first block to fetch
StartID skipchain.SkipBlockID
// Determines the length of the Blocks attribute in the PaginateResponse.
// The list contains PageSize consecutive blocks
PageSize uint64
// The number of (asynchrounous) requests the service will return to the
// client. Requests are send in a consecutive order wrt their list of blocks
NumPages uint64
// If true then blocks are consecutive in the reverse order, ie. following
// backward links.
Backward bool
}

// PaginateResponse is a reponse from a PaginateRequest.
type PaginateResponse struct {
// A list of consecutive blocks
Blocks []*skipchain.SkipBlock
// The page number index: relevant if the clients asked for more than one
// asynchrounous reply from the service.
PageNumber uint64
// Tells if the result contains consecutive blocks in a reversed order.
Backward bool
// Used to tell the client if an error occured. Any error code not equal to
// 0 means that something special happened.
ErrorCode uint64
// A list of error messages in case something special happened.
ErrorText []string
}

// DownloadState requests the current global state of that node.
// If it is the first call to the service, then Reset
// must be true, else an error will be returned, or old data
Expand Down
2 changes: 1 addition & 1 deletion byzcoin/service.go
Expand Up @@ -2888,7 +2888,7 @@ func newService(c *onet.Context) (onet.Service, error) {
return nil, err
}

if err := s.RegisterStreamingHandlers(s.StreamTransactions); err != nil {
if err := s.RegisterStreamingHandlers(s.StreamTransactions, s.PaginateBlocks); err != nil {
return nil, xerrors.Errorf("registering handlers: %v", err)
}
s.RegisterProcessorFunc(viewChangeMsgID, s.handleViewChangeReq)
Expand Down
146 changes: 145 additions & 1 deletion byzcoin/streaming.go
@@ -1,14 +1,32 @@
package byzcoin

import (
"fmt"
"sync"

"go.dedis.ch/cothority/v3/skipchain"
"go.dedis.ch/onet/v3/network"
)

const (
// PaginateWrongInput is used when an invalid parameter is given in the
// paginate request
PaginateWrongInput = 2
// PaginateWrongStart is used when the startID is invalid
PaginateWrongStart = 3
// PaginatePageFailed is used when the first block of the page failed to be
// retreived
PaginatePageFailed = 4
// PaginateLinkMissing is used when the next (or previous if backward is
// true) block does not exist
PaginateLinkMissing = 5
// PaginateGetBlockFailed is used when it coulnd't get a next or previous block
PaginateGetBlockFailed = 6
)

func init() {
network.RegisterMessages(&StreamingRequest{}, &StreamingResponse{})
network.RegisterMessages(&StreamingRequest{}, &StreamingResponse{},
&PaginateRequest{}, &PaginateResponse{})
}

type streamingManager struct {
Expand Down Expand Up @@ -105,3 +123,129 @@ func (s *Service) StreamTransactions(msg *StreamingRequest) (chan *StreamingResp
}()
return outChan, stopChan, nil
}

// PaginateBlocks return blocks with pagination, ie. N asynchounous requests
// that contain each K consecutive block. The caller is responsible for closing
// the close chan when the caller wants to close the connection.
func (s *Service) PaginateBlocks(msg *PaginateRequest) (chan *PaginateResponse, chan bool, error) {

outChan := make(chan *PaginateResponse)
stopChan := make(chan bool)

go func() {

if msg.PageSize < 1 {
outChan <- &PaginateResponse{
ErrorCode: PaginateWrongInput,
ErrorText: []string{fmt.Sprintf("PageSize should be >= 1, "+
"but we found %d", msg.PageSize)},
}
return
}

if msg.NumPages < 1 {
outChan <- &PaginateResponse{
ErrorCode: PaginateWrongInput,
ErrorText: []string{fmt.Sprintf("NumPages should be >= 1, "+
"but we found %d", msg.NumPages)},
}
return
}

if msg.StartID == nil {
outChan <- &PaginateResponse{
ErrorCode: PaginateWrongStart,
ErrorText: []string{"StartID is nil"},
}
return
}

nextID := msg.StartID

for pageNum := uint64(0); pageNum < msg.NumPages; pageNum++ {
_, skipBlock, err := s.getBlockTx(nextID)

blocks := make([]*skipchain.SkipBlock, msg.PageSize)
if err != nil {
outChan <- &PaginateResponse{
ErrorCode: PaginatePageFailed,
ErrorText: []string{"failed to get the first block with ID",
fmt.Sprintf("%x", msg.StartID), fmt.Sprintf("%v", err)},
}
return
}
blocks[0] = skipBlock

if msg.Backward {
if len(skipBlock.BackLinkIDs) != 0 {
nextID = skipBlock.BackLinkIDs[0]
} else {
nextID = nil
}
} else {
if len(skipBlock.ForwardLink) != 0 {
nextID = skipBlock.ForwardLink[0].To
} else {
nextID = nil
}
}

for i := uint64(1); i < msg.PageSize; i++ {

if nextID == nil {
outChan <- &PaginateResponse{
ErrorCode: PaginateLinkMissing,
ErrorText: []string{"couldn't find a nextID for block",
fmt.Sprintf("%x", skipBlock.Hash), "page number",
fmt.Sprintf("%d", pageNum), "index", fmt.Sprintf("%d", i)},
}
return
}

_, skipBlock, err = s.getBlockTx(nextID)
if err != nil {
outChan <- &PaginateResponse{
ErrorCode: PaginateGetBlockFailed,
ErrorText: []string{"failed to get block with ID",
fmt.Sprintf("%x", nextID), "page number",
fmt.Sprintf("%d", pageNum), "index",
fmt.Sprintf("%d", i), fmt.Sprintf("%v", err)},
}
return
}
blocks[i] = skipBlock

if msg.Backward {
if len(skipBlock.BackLinkIDs) != 0 {
nextID = skipBlock.BackLinkIDs[0]
} else {
nextID = nil
}
} else {
if len(skipBlock.ForwardLink) != 0 {
nextID = skipBlock.ForwardLink[0].To
} else {
nextID = nil
}
}
}
response := &PaginateResponse{
Blocks: blocks,
PageNumber: pageNum,
Backward: msg.Backward,
}
// Allows the service to exit prematurely if the connection stops
select {
case <-stopChan:
return
default:
outChan <- response
}
}
// Waiting for the streaming connection to stop. This signal comes
// from onet, which sets it when the client closes the connection.
<-stopChan
}()

return outChan, stopChan, nil
}

0 comments on commit 793b6b0

Please sign in to comment.