Skip to content

Commit

Permalink
use sqlite for static db (#148)
Browse files Browse the repository at this point in the history
* use sqlite for static db

* remove bootstrap logic

---------

Co-authored-by: poundifdef <jay@scratchdata.com>
  • Loading branch information
poundifdef and poundifdef committed Apr 4, 2024
1 parent 14a5beb commit d2975be
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 234 deletions.
5 changes: 0 additions & 5 deletions config.yaml
Expand Up @@ -36,11 +36,6 @@ cache:
database:
type: static

# type: sqlite
# settings:
# dsn: "local.db"
# default_user: "your_email@example.com"

dashboard:
enabled: true

Expand Down
2 changes: 1 addition & 1 deletion pkg/api/auth.go
Expand Up @@ -86,7 +86,7 @@ func (a *ScratchDataAPIStruct) Authenticator(ja *jwtauth.JWTAuth) func(http.Hand
return
}

user := a.storageServices.Database.GetUser(int64(userId.(float64)))
user := a.storageServices.Database.GetUser(uint(userId.(float64)))
if user.ID <= 0 {
log.Error().Msg("User not found")
http.Redirect(w, r, "/login", http.StatusTemporaryRedirect)
Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/database/database.go
Expand Up @@ -27,7 +27,10 @@ type Database interface {
CreateShareQuery(ctx context.Context, destId int64, query string, expires time.Duration) (queryId uuid.UUID, err error)
GetShareQuery(ctx context.Context, queryId uuid.UUID) (models.SharedQuery, bool)

GetUser(int64) *models.User
CreateTeam(name string) (*models.Team, error)
AddUserToTeam(userId uint, teamId uint) error

GetUser(uint) *models.User
GetTeamId(userId uint) (uint, error)
CreateUser(email string, source string, details string) (*models.User, error)

Expand Down
54 changes: 29 additions & 25 deletions pkg/storage/database/gorm/gorm.go
Expand Up @@ -65,30 +65,6 @@ func NewGorm(

var teamCount int64
db.Model(&models.Team{}).Count(&teamCount)
if teamCount == 0 {
if rc.DefaultUser == "" {
return nil, errors.New("Must specify a default_user in the DB settings file")
}

team := models.Team{Name: rc.DefaultUser}
db.Create(&team)

settings := map[string]any{"file": "data.duckdb"}

destination := models.Destination{
TeamID: team.ID,
Name: "Local DuckDB",
Type: "duckdb",
Settings: datatypes.NewJSONType(settings),
}
db.Create(&destination)

apiKey := models.APIKey{DestinationID: destination.ID, HashedAPIKey: rc.Hash("local")}
db.Create(&apiKey)

user := models.User{Teams: []*models.Team{&team}, Email: rc.DefaultUser, AuthType: "google"}
db.Create(&user)
}

return rc, nil
}
Expand Down Expand Up @@ -189,6 +165,34 @@ func (s *Gorm) CreateDestination(
}, nil
}

func (s *Gorm) CreateTeam(name string) (*models.Team, error) {
team := &models.Team{
Name: name,
}

res := s.db.Create(team)
if res.Error != nil {
return nil, res.Error
}

return team, nil
}

func (s *Gorm) AddUserToTeam(userId uint, teamId uint) error {
user := s.GetUser(userId)
if user == nil {
return errors.New("user not found")
}

t := models.Team{
Model: gorm.Model{
ID: teamId,
},
}
res := s.db.Model(user).Association("Teams").Append([]models.Team{t})
return res
}

func (s *Gorm) GetDestinations(c context.Context, userId uint) ([]models.Destination, error) {
teamId, err := s.GetTeamId(userId)
if err != nil {
Expand Down Expand Up @@ -222,7 +226,7 @@ func (s *Gorm) Hash(str string) string {
return hex.EncodeToString(hash[:])
}

func (s *Gorm) GetUser(userId int64) *models.User {
func (s *Gorm) GetUser(userId uint) *models.User {
var user models.User
tx := s.db.First(&user, userId)
if tx.Error != nil {
Expand Down
230 changes: 28 additions & 202 deletions pkg/storage/database/static/static.go
Expand Up @@ -2,229 +2,55 @@ package static

import (
"context"
"encoding/json"
"errors"
"sync"
"time"

"github.com/google/uuid"
"github.com/mitchellh/mapstructure"
"github.com/scratchdata/scratchdata/pkg/config"
"github.com/scratchdata/scratchdata/pkg/storage/database/models"
"gorm.io/datatypes"
"github.com/scratchdata/scratchdata/pkg/storage/database/gorm"
)

var StaticDBError = errors.New("Cannot make changes when using static config. Update config file or use a database instead.")
func NewStaticDatabase(conf config.Database, destinations []config.Destination, apiKeys []config.APIKey) (*gorm.Gorm, error) {
ctx := context.TODO()

type StaticDatabase struct {
conf config.Database
destinations []config.Destination
apiKeyToDestination map[string]uint
adminAPIKeys []config.APIKey
defaultSettings := gorm.Gorm{DSN: "file::memory:?cache=shared"}
defaultSettingsMap := map[string]any{}
mapstructure.Decode(&defaultSettings, &defaultSettingsMap)

ids uint
mu sync.Mutex
queue map[models.MessageType][]*models.Message
}

func (db *StaticDatabase) DeleteDestination(ctx context.Context, userId uint, destId int64) error {
return StaticDBError
}

func NewStaticDatabase(conf config.Database, destinations []config.Destination, apiKeys []config.APIKey) (*StaticDatabase, error) {
rc := StaticDatabase{
conf: conf,
destinations: destinations,
apiKeyToDestination: map[string]uint{},
adminAPIKeys: apiKeys,

queue: make(map[models.MessageType][]*models.Message),
}

for i, destination := range destinations {
for _, apiKey := range destination.APIKeys {
rc.apiKeyToDestination[apiKey] = uint(i)
}
}

return &rc, nil
}

func (db *StaticDatabase) Hash(s string) string {
return s
}

func (db *StaticDatabase) GetDestinations(ctx context.Context, teamId uint) ([]models.Destination, error) {
rc := make([]models.Destination, 0)
for _, dest := range db.destinations {
rc = append(rc, models.Destination{
TeamID: 0,
Type: dest.Type,
Name: dest.Name,
Settings: datatypes.NewJSONType(dest.Settings),
})
}
return rc, nil
}

func (db *StaticDatabase) GetDestination(ctx context.Context, teamId, destId uint) (models.Destination, error) {
dest := db.destinations[destId]
rc := models.Destination{
TeamID: 0,
Type: dest.Type,
Name: dest.Name,
Settings: datatypes.NewJSONType(dest.Settings),
}
rc.TeamID = teamId
return rc, nil
}

func (db *StaticDatabase) AddAPIKey(ctx context.Context, destId int64, key string) error {
return StaticDBError
}

func (db *StaticDatabase) GetTeamId(userId uint) (uint, error) {
return 0, nil
}

func (db *StaticDatabase) CreateDestination(ctx context.Context, teamId uint, name string, destType string, settings map[string]any) (config.Destination, error) {
return config.Destination{}, StaticDBError
}

func (db *StaticDatabase) VerifyAdminAPIKey(ctx context.Context, apiKey string) bool {
for _, key := range db.adminAPIKeys {
if key.Key == apiKey {
return true
}
}

return false
}

func (db *StaticDatabase) CreateShareQuery(ctx context.Context, destId int64, query string, expires time.Duration) (queryId uuid.UUID, err error) {
return uuid.Nil, StaticDBError
}

func (db *StaticDatabase) GetShareQuery(ctx context.Context, queryId uuid.UUID) (models.SharedQuery, bool) {
return models.SharedQuery{}, false
}

func (db *StaticDatabase) GetAPIKeyDetails(ctx context.Context, apiKey string) (models.APIKey, error) {
dbId, ok := db.apiKeyToDestination[apiKey]
if !ok {
return models.APIKey{}, errors.New("invalid API key")
}
rc := models.APIKey{
DestinationID: dbId,
}
return rc, nil
}

func (db *StaticDatabase) GetDestinationCredentials(ctx context.Context, dbID int64) (models.Destination, error) {
dest := db.destinations[dbID]
rc := models.Destination{
TeamID: 0,
Type: dest.Type,
Name: dest.Name,
Settings: datatypes.NewJSONType(dest.Settings),
gormConf := config.Database{
Type: "sqlite",
Settings: defaultSettingsMap,
}
rc.TeamID = 0
return rc, nil
}

func (db *StaticDatabase) CreateUser(email string, source string, details string) (*models.User, error) {
user := &models.User{
Email: "scratchdata@example.com",
AuthType: "static",
}
user.ID = 1
return user, nil
}

func (db *StaticDatabase) GetUser(int64) *models.User {
user := &models.User{
Email: "scratchdata@example.com",
AuthType: "static",
}
user.ID = 1
return user
}

func (db *StaticDatabase) Enqueue(messageType models.MessageType, m any) (*models.Message, error) {
db.mu.Lock()
defer db.mu.Unlock()

queue, ok := db.queue[messageType]
if !ok {
queue = make([]*models.Message, 0)
db.queue[messageType] = queue
}

mStr, err := json.Marshal(m)
rc, err := gorm.NewGorm(gormConf)
if err != nil {
return nil, err
}

message := &models.Message{
MessageType: messageType,
Status: models.New,
Message: string(mStr),
team, err := rc.CreateTeam("Team Scratch")
if err != nil {
return nil, err
}

db.ids++
message.ID = db.ids

queue = append(queue, message)
db.queue[messageType] = queue

return message, nil
}

func (db *StaticDatabase) Dequeue(messageType models.MessageType, claimedBy string) (*models.Message, bool) {
db.mu.Lock()
defer db.mu.Unlock()

queue, ok := db.queue[messageType]
if !ok {
return nil, false
user, err := rc.CreateUser("scratch@example.com", "static", "")
if err != nil {
return nil, err
}

if len(queue) == 0 {
return nil, false
}
rc.AddUserToTeam(user.ID, team.ID)

for _, message := range queue {
if message.Status == models.Claimed {
continue
for _, destination := range destinations {
dest, err := rc.CreateDestination(ctx, team.ID, destination.Name, destination.Type, destination.Settings)
if err != nil {
return nil, err
}
message.ClaimedAt = time.Now()
message.ClaimedBy = claimedBy
message.Status = models.Claimed

return message, true
}

return nil, false
}

func (db *StaticDatabase) Delete(id uint) error {
db.mu.Lock()
defer db.mu.Unlock()

for k, queue := range db.queue {
found := -1
for i, message := range queue {
if message.ID == id {
found = i
break
for _, apiKey := range destination.APIKeys {
err = rc.AddAPIKey(ctx, dest.ID, rc.Hash(apiKey))
if err != nil {
return nil, err
}
}

if found >= 0 {
newQueue := append(queue[:found], queue[found+1:]...)
db.queue[k] = newQueue
break
}
}

return nil
return rc, nil

}

0 comments on commit d2975be

Please sign in to comment.