Skip to content

Commit

Permalink
Bypros proxy service (#2439)
Browse files Browse the repository at this point in the history
* Fixes the pagination service

* Adds a proxy service

* Fixes pre-commit

* Fixes some code smell

* Adds comments, remove println

* Fixes lint

* Tidies go.mod

* Addresses Pierluca's comments

* Fixes tests

* Addresses code smells

* Adds a table for versioning

* Fixes the tests

* Updates the version SQL table

* Updates the version SQL table

* Updates the test + removes inclusion of bypros service in conode

I removed the TestProxyCatchUp_Genesis because somehow the chain started by the testing framework doesn't update on the genesis block.
  • Loading branch information
nkcr committed Mar 30, 2021
1 parent a4f66d7 commit e356209
Show file tree
Hide file tree
Showing 21 changed files with 2,764 additions and 10 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -22,3 +22,5 @@ external/java/.project

# cloned
/Coding

bypros/postgres
62 changes: 62 additions & 0 deletions bypros/README.md
@@ -0,0 +1,62 @@
# Bypros - Byzcoin Proxy

Bypros can listen on a conode and save the chain in a relational database. It
offers a read-only service call that can execute a provided SQL query.

Apart from listening and executing a query, there is also a service call to
catch up from a given block to the end of the chain.

This service is meant to be deployed and run in a sidecar pattern to a node, as
it targets only a specific node and not a roster, as traditionally done with
cothority services. This implies that you should manage and fully trust the node
you are talking to. The proxy reflects the state of a specific node, and not
necessarily the "chain" represented by the collective authority. Note that this
limitation can be alleviated with a layer handling connection to multiple nodes
(roster) instead of only one, but this is out of scope of this service.

## Limitations

The proxy should be used to target only one skipchain. Upon a first call to
follow or catch up, the service saves the skipchain ID and prevents any call to
another skipchain ID.

A single "follow" is allowed at the same time as this operation, once requested,
continuously runs in the background until a request to "unfollow" is sent. An
error is thrown if one tries to follow while the system is already following.

## Some technical details

### Run postgres in a docker

Running the database with docker is only recommended in development
environments.

Use the dockerfile in `storage/sqlstore` and follow the instructions written in
it.

### Export the database urls

Proxy expects 2 databases urls: one with read/write rights, and another with
only read rights.

```sh
export PROXY_DB_URL="postgres://bypros:docker@localhost:5432/bypros"
export PROXY_DB_URL_RO="postgres://proxy:1234@localhost:5432/bypros"
```

### Example: select unexecuted deferred instance IDs

This is the kind of query that can be sent:

```sql
select encode(instruction.contract_iid::bytea, 'hex'), instruction.contract_name from cothority.instruction
where instruction.action = 'spawn:deferred'
and instruction.contract_iid not in (
select instruction.contract_iid from cothority.instruction
join cothority.transaction on
transaction.transaction_id = instruction.transaction_id
where transaction.accepted = true and
instruction.action = 'invoke:deferred.execProposedTx'
)
group by instruction.contract_iid, instruction.contract_name
```
217 changes: 217 additions & 0 deletions bypros/api.go
@@ -0,0 +1,217 @@
package bypros

import (
"context"
"fmt"

"github.com/gorilla/websocket"
"go.dedis.ch/cothority/v3"
"go.dedis.ch/cothority/v3/skipchain"
"go.dedis.ch/onet/v3"
"go.dedis.ch/onet/v3/network"
"go.dedis.ch/onet/v4/log"
"go.dedis.ch/protobuf"
"golang.org/x/xerrors"
)

var overlayClient OverlayClient = onetOverlay{
onet.NewClient(cothority.Suite, ServiceName),
}

// OverlayClient defines the primitives needed by our client. That's all we need
// from the outer world.
type OverlayClient interface {
SendProtobuf(dst *network.ServerIdentity, msg interface{}, ret interface{}) error
OpenWS(url string) (WsHandler, error)
}

// WsHandler defines the primitives to handle a websocket connection.
type WsHandler interface {
Close() error
Write(messageType int, data []byte) error
Read() (messageType int, p []byte, err error)
}

// NewClient creates a new proxy client
func NewClient() *Client {
return &Client{OverlayClient: overlayClient}
}

// Client defines a proxy client
//
// - implements OverlayClient
type Client struct {
OverlayClient
}

// Follow sends a request to start following a node, ie. listening to new blocks
// and updating the database accordingly.
func (c *Client) Follow(host, target *network.ServerIdentity,
scID skipchain.SkipBlockID) error {

req := Follow{
Target: target,
ScID: scID,
}

resp := EmptyReply{}

err := c.SendProtobuf(host, &req, &resp)
if err != nil {
return xerrors.Errorf("failed to send follow request: %v", err)
}

return nil
}

// Unfollow sends a request to stop following a node.
func (c *Client) Unfollow(host *network.ServerIdentity) error {
req := Unfollow{}
resp := EmptyReply{}

err := c.SendProtobuf(host, &req, &resp)
if err != nil {
return xerrors.Errorf("failed to send follow request: %v", err)
}

return nil
}

// Query sends a query request. The query must be a read-only SQL query, for
// example:
// Select * from cothority.block
func (c *Client) Query(host *network.ServerIdentity, query string) ([]byte, error) {
req := Query{
Query: query,
}
resp := QueryReply{}

err := c.SendProtobuf(host, &req, &resp)
if err != nil {
return nil, xerrors.Errorf("failed to send query request: %v", err)
}

return resp.Result, nil
}

// CatchUP sends a request to catch up from a particular block. A catch up will
// update the database from the specified block until the end of the chain.
// Every specifies the interval at which updates are sent.
func (c *Client) CatchUP(ctx context.Context, host, target *network.ServerIdentity,
scID skipchain.SkipBlockID, fromBlock skipchain.SkipBlockID,
every int) (<-chan CatchUpResponse, error) {

req := CatchUpMsg{
ScID: scID,
Target: target,
FromBlock: fromBlock,
UpdateEvery: every,
}

apiEndpoint, err := getWsAddr(req.Target)
if err != nil {
return nil, xerrors.Errorf("failed to get ws addr: %v", err)
}

apiURL := fmt.Sprintf("%s/%s/%s", apiEndpoint, ServiceName, "CatchUpMsg")

ws, err := c.OverlayClient.OpenWS(apiURL)
if err != nil {
return nil, xerrors.Errorf("failed to open ws: %v", err)
}

buf, err := protobuf.Encode(&req)
if err != nil {
return nil, xerrors.Errorf("failed to encode streaming request: %v", err)
}

err = ws.Write(websocket.BinaryMessage, buf)
if err != nil {
return nil, xerrors.Errorf("failed to send streaming request: %v", err)
}

outChan := make(chan CatchUpResponse)

go listenCatchup(ws, outChan)

return outChan, nil
}

// listenCatchup listens for messages on the ws and writes the responses to the
// outChan. It closes the outChan once a done message is received.
func listenCatchup(ws WsHandler, outChan chan CatchUpResponse) {
defer func() {
err := ws.Write(websocket.CloseMessage, nil)
if err != nil {
log.Warnf("failed to send close: %v", err)
}
ws.Close()
}()

for {
_, buf, err := ws.Read()
if err != nil {
outChan <- CatchUpResponse{
Err: fmt.Sprintf("failed to read response: %v", err),
}
return
}

resp := CatchUpResponse{}

err = protobuf.Decode(buf, &resp)
if err != nil {
outChan <- CatchUpResponse{
Err: fmt.Sprintf("failed to decode response: %v", err),
}
return
}

outChan <- resp
if resp.Done {
close(outChan)
return
}
}
}

// onetOverlay provides an overlay implementation based on onet and gorilla.
//
// - implements OverlayClient
type onetOverlay struct {
*onet.Client
}

// OpenWS implements OverlayClient
func (o onetOverlay) OpenWS(url string) (WsHandler, error) {
ws, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
return nil, xerrors.Errorf("failed to dial %s: %v", url, err)
}

return gorillaWs{
client: ws,
}, nil
}

// gorillaWs is a websocket handler that uses the gorilla package
//
// - implements WsHandler
type gorillaWs struct {
client *websocket.Conn
}

// CloseWS implements WsHandler
func (g gorillaWs) Close() error {
return g.client.Close()
}

// SendWSMessage implements WsHandler
func (g gorillaWs) Write(messageType int, data []byte) error {
return g.client.WriteMessage(messageType, data)
}

// ReadWSMessage implements WsHandler
func (g gorillaWs) Read() (messageType int, p []byte, err error) {
return g.client.ReadMessage()
}

0 comments on commit e356209

Please sign in to comment.