Skip to content

Commit

Permalink
Remote Connection URL (#150)
Browse files Browse the repository at this point in the history
* "Remote Connection" button and model

* share link is in working state

* works with separate account

* request works unauth

* add home router

* connection save message

* typo

* destination is immediately created

* requested changes
  • Loading branch information
breadchris committed Apr 5, 2024
1 parent ef94427 commit c5f0a78
Show file tree
Hide file tree
Showing 16 changed files with 573 additions and 242 deletions.
14 changes: 11 additions & 3 deletions pkg/api/destinations.go
Expand Up @@ -60,13 +60,21 @@ func (a *ScratchDataAPIStruct) CreateDestination(w http.ResponseWriter, r *http.
http.Error(w, "unable to get user", http.StatusInternalServerError)
return
}
newDest, err := a.storageServices.Database.CreateDestination(r.Context(), user.ID, dest.Name, dest.Type, dest.Settings)
newDest, err := a.storageServices.Database.CreateDestination(
r.Context(),
user.ID,
dest.Name,
dest.Type,
dest.Settings,
)
if err != nil {
render.Status(r, http.StatusInternalServerError)
render.PlainText(w, r, err.Error())
return
}

newDest.Settings = nil
render.JSON(w, r, newDest)
dstCfg := newDest.ToConfig()
dstCfg.Settings = map[string]any{}

render.JSON(w, r, dstCfg)
}
7 changes: 6 additions & 1 deletion pkg/storage/database/database.go
Expand Up @@ -17,10 +17,15 @@ type Database interface {

GetDestinations(ctx context.Context, teamId uint) ([]models.Destination, error)
GetDestination(ctx context.Context, teamId, destId uint) (models.Destination, error)
CreateDestination(ctx context.Context, teamId uint, name string, destType string, settings map[string]any) (config.Destination, error)
CreateDestination(ctx context.Context, teamId uint, name string, destType string, settings map[string]any) (models.Destination, error)
DeleteDestination(ctx context.Context, teamId uint, destId int64) error
UpdateDestination(ctx context.Context, dest models.Destination) error
GetDestinationCredentials(ctx context.Context, dbID int64) (models.Destination, error)

CreateConnectionRequest(ctx context.Context, dest models.Destination) (models.ConnectionRequest, error)
GetConnectionRequest(ctx context.Context, requestId uuid.UUID) (models.ConnectionRequest, error)
DeleteConnectionRequest(ctx context.Context, id uint) error

AddAPIKey(ctx context.Context, destId int64, hashedAPIKey string) error
GetAPIKeyDetails(ctx context.Context, hashedAPIKey string) (models.APIKey, error)

Expand Down
63 changes: 49 additions & 14 deletions pkg/storage/database/gorm/gorm.go
Expand Up @@ -11,14 +11,13 @@ import (
"fmt"
"time"

"github.com/google/uuid"
"github.com/rs/zerolog/log"
"github.com/scratchdata/scratchdata/pkg/config"
"github.com/scratchdata/scratchdata/pkg/storage/database/models"
"github.com/scratchdata/scratchdata/pkg/util"
"gorm.io/datatypes"
"gorm.io/driver/postgres"

"github.com/google/uuid"
"github.com/rs/zerolog/log"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
Expand Down Expand Up @@ -58,6 +57,7 @@ func NewGorm(
&models.Destination{},
&models.APIKey{},
&models.Message{},
&models.ConnectionRequest{},
)
if err != nil {
return nil, err
Expand All @@ -73,6 +73,31 @@ func (s *Gorm) VerifyAdminAPIKey(ctx context.Context, apiKey string) bool {
return false
}

func (s *Gorm) CreateConnectionRequest(ctx context.Context, dest models.Destination) (models.ConnectionRequest, error) {
requestId := uuid.New().String()
req := models.ConnectionRequest{
RequestID: requestId,
DestinationID: dest.ID,
// TODO breadchris make configurable
Expiration: time.Now().Add(time.Hour * 24 * 7),
}

res := s.db.Create(&req)
if res.Error != nil {
return models.ConnectionRequest{}, res.Error
}
return req, nil
}

func (s *Gorm) GetConnectionRequest(ctx context.Context, requestId uuid.UUID) (models.ConnectionRequest, error) {
var req models.ConnectionRequest
res := s.db.Preload("Destination").First(&req, "request_id = ?", requestId.String())
if res.Error != nil {
return models.ConnectionRequest{}, res.Error
}
return req, nil
}

func (s *Gorm) CreateShareQuery(ctx context.Context, destId int64, query string, expires time.Duration) (queryId uuid.UUID, err error) {
id := uuid.New()
link := models.ShareLink{
Expand Down Expand Up @@ -143,26 +168,36 @@ func (s *Gorm) CreateDestination(
name string,
destType string,
settings map[string]any,
) (config.Destination, error) {
) (models.Destination, error) {
// TODO breadchris what fields are considered unique?

dest := &models.Destination{
dest := models.Destination{
TeamID: teamId,
Name: name,
Type: destType,
Settings: datatypes.NewJSONType(settings),
}

res := s.db.Create(dest)
if res.Error != nil {
return config.Destination{}, res.Error
result := s.db.Create(&dest)
if result.Error != nil {
return models.Destination{}, result.Error
}
return config.Destination{
ID: int64(dest.ID),
Name: name,
Type: destType,
Settings: settings,
}, nil

if result.RowsAffected != 1 {
return models.Destination{}, errors.New("unable to create destination")
}

return dest, nil
}

func (s *Gorm) DeleteConnectionRequest(ctx context.Context, id uint) error {
res := s.db.Delete(&models.ConnectionRequest{}, "id = ?", id)
return res.Error
}

func (s *Gorm) UpdateDestination(ctx context.Context, dest models.Destination) error {
res := s.db.Save(&dest)
return res.Error
}

func (s *Gorm) CreateTeam(name string) (*models.Team, error) {
Expand Down
22 changes: 16 additions & 6 deletions pkg/storage/database/gorm/queue.go
Expand Up @@ -2,6 +2,7 @@ package gorm

import (
"encoding/json"
"errors"
"time"

"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -29,17 +30,21 @@ func (db *Gorm) Enqueue(messageType models.MessageType, m any) (*models.Message,
func (db *Gorm) Dequeue(messageType models.MessageType, claimedBy string) (*models.Message, bool) {
var message models.Message

res := db.db.Transaction(func(tx *gorm.DB) error {
err := db.db.Transaction(func(tx *gorm.DB) error {

// This locking does not work with SQLite. Should use UPDATE .. WHERE status = new LIMIT 1 RESULT
findRes := tx.Clauses(clause.Locking{Strength: clause.LockingStrengthUpdate, Options: clause.LockingOptionsSkipLocked}).
Where("status = ? AND message_type = ?", models.New, messageType).
First(&message)
Find(&message)

if findRes.Error != nil {
return findRes.Error
}

if findRes.RowsAffected == 0 {
return gorm.ErrRecordNotFound
}

message.Status = models.Claimed
message.ClaimedAt = time.Now()
message.ClaimedBy = claimedBy
Expand All @@ -48,13 +53,18 @@ func (db *Gorm) Dequeue(messageType models.MessageType, claimedBy string) (*mode
if saveRes.Error != nil {
return saveRes.Error
}

return nil

})

if res != nil {
log.Error().Err(res).Any("message_type", messageType).Str("claimed_by", claimedBy).Msg("Unable to query for messages")
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, false
}
log.Error().
Err(err).
Any("message_type", messageType).
Str("claimed_by", claimedBy).
Msg("Unable to query for messages")
return nil, false
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/database/models/models.go
Expand Up @@ -58,6 +58,14 @@ func (d Destination) ToConfig() config.Destination {
}
}

type ConnectionRequest struct {
gorm.Model
RequestID string `gorm:"index,unique"`
DestinationID uint
Destination Destination
Expiration time.Time
}

type APIKey struct {
gorm.Model
DestinationID uint
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/database/static/static.go
Expand Up @@ -44,7 +44,7 @@ func NewStaticDatabase(conf config.Database, destinations []config.Destination,
}

for _, apiKey := range destination.APIKeys {
err = rc.AddAPIKey(ctx, dest.ID, rc.Hash(apiKey))
err = rc.AddAPIKey(ctx, int64(dest.ID), rc.Hash(apiKey))
if err != nil {
return nil, err
}
Expand Down

0 comments on commit c5f0a78

Please sign in to comment.