Skip to content

Commit

Permalink
support delta, cache recovery, cache empty proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed May 2, 2024
1 parent d472969 commit b421366
Show file tree
Hide file tree
Showing 32 changed files with 1,782 additions and 1,948 deletions.
6 changes: 1 addition & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ require (
github.com/FZambia/statik v0.1.2-0.20180217151304-b9f012bb2a1b
github.com/FZambia/tarantool v0.3.1
github.com/FZambia/viper-lite v0.0.0-20220110144934-1899f66c7d0e
github.com/Yiling-J/theine-go v0.3.2
github.com/centrifugal/centrifuge v0.32.2-0.20240429160900-ddd8e2efac42
github.com/centrifugal/centrifuge v0.32.2-0.20240501054446-d4699c79d655
github.com/centrifugal/protocol v0.12.2-0.20240429145950-b906e73562fe
github.com/cristalhq/jwt/v5 v5.4.0
github.com/gobwas/glob v0.2.3
Expand Down Expand Up @@ -48,14 +47,11 @@ require (
)

require (
github.com/gammazero/deque v0.2.1 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/pierrec/lz4/v4 v4.1.19 // indirect
github.com/shadowspore/fossil-delta v0.0.0-20240102155221-e3a8590b820b // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.uber.org/mock v0.4.0 // indirect
)

Expand Down
14 changes: 2 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@ github.com/FZambia/tarantool v0.3.1 h1:M6FiJrUBu1TvE8aySwSu47He7aYrJvufr+VPzP8FP
github.com/FZambia/tarantool v0.3.1/go.mod h1:YHnvW/H6TPJP04s3RtbBFqvxTvqfYnPBd+TVM1GWdsw=
github.com/FZambia/viper-lite v0.0.0-20220110144934-1899f66c7d0e h1:COyWHWCYUotWRo+Z1Lk8B9NDceEybV61C9diY7YVj8g=
github.com/FZambia/viper-lite v0.0.0-20220110144934-1899f66c7d0e/go.mod h1:hx7D3T4iFXiy0QWL4m3yNfzz5CQCtbV5yNdE4UlWo0s=
github.com/Yiling-J/theine-go v0.3.2 h1:XcSdMPV9DwBD9gqqSxbBfVJnP8CCiqNSqp3C6YpmMHI=
github.com/Yiling-J/theine-go v0.3.2/go.mod h1:ygLXqrWPZT/a+PzK5hQ0+a6gu0lpAY5IudTcgnPleqI=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/centrifugal/centrifuge v0.32.2-0.20240429160900-ddd8e2efac42 h1:ERsqwG+X6plELoINjVMdSuC+l+kman6LyKszb6eukCM=
github.com/centrifugal/centrifuge v0.32.2-0.20240429160900-ddd8e2efac42/go.mod h1:KyjlD2F8E8whm19DEYV6s0TMtN8P3v06b82tCdx/zXs=
github.com/centrifugal/centrifuge v0.32.2-0.20240501054446-d4699c79d655 h1:RSM90BhhyQMZ/hY9CHj/jI+uZjnub8PCjs0Mf/7E6Wc=
github.com/centrifugal/centrifuge v0.32.2-0.20240501054446-d4699c79d655/go.mod h1:KyjlD2F8E8whm19DEYV6s0TMtN8P3v06b82tCdx/zXs=
github.com/centrifugal/protocol v0.12.2-0.20240429145950-b906e73562fe h1:uXsl6MWJZMlk42wfVDOK0z596x9JqmkvAqlWzQ830qU=
github.com/centrifugal/protocol v0.12.2-0.20240429145950-b906e73562fe/go.mod h1:lM54PGU/u5WupYSb755Zv6tZ2ju1SqNKCp6A4s0DeG4=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
Expand All @@ -33,8 +31,6 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI=
github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0=
github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
Expand Down Expand Up @@ -94,8 +90,6 @@ github.com/justinas/alice v1.2.0 h1:+MHSA/vccVCF4Uq37S42jwlkvI2Xzl7zTPCN5BnZNVo=
github.com/justinas/alice v1.2.0/go.mod h1:fN5HRH/reO/zrUflLfTN43t3vXvKzvZIENsNEe7i7qA=
github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4=
github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand Down Expand Up @@ -202,10 +196,6 @@ github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.50.0 h1:zvpPXY7RfYAGSdYQLjp6zxdJNSYD/+FFoCTQN9IPxBs=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.50.0/go.mod h1:BMn8NB1vsxTljvuorms2hyOs8IBuuBEq0pl7ltOfy30=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.50.0 h1:cEPbyTSEHlQR89XVlyo78gqluF8Y3oMeBkXGWzQsfXY=
Expand Down
15 changes: 14 additions & 1 deletion internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,17 @@ func (h *Executor) Publish(ctx context.Context, cmd *PublishRequest) *PublishRes
historyTTL = 0
}

delta := cmd.Delta
if chOpts.DeltaPublish {
delta = true
}

result, err := h.node.Publish(
cmd.Channel, data,
centrifuge.WithHistory(historySize, time.Duration(historyTTL), time.Duration(historyMetaTTL)),
centrifuge.WithTags(cmd.GetTags()),
centrifuge.WithIdempotencyKey(cmd.GetIdempotencyKey()),
centrifuge.WithDelta(delta),
)
if err != nil {
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error publishing data to channel", map[string]any{"error": err.Error(), "channel": cmd.Channel}))
Expand Down Expand Up @@ -279,11 +285,17 @@ func (h *Executor) Broadcast(ctx context.Context, cmd *BroadcastRequest) *Broadc
historyTTL = 0
}

delta := cmd.Delta
if chOpts.DeltaPublish {
delta = true
}

result, err := h.node.Publish(
ch, data,
centrifuge.WithHistory(historySize, time.Duration(historyTTL), time.Duration(historyMetaTTL)),
centrifuge.WithTags(cmd.GetTags()),
centrifuge.WithIdempotencyKey(cmd.GetIdempotencyKey()),
centrifuge.WithDelta(delta),
)
resp := &PublishResponse{}
if err == nil {
Expand Down Expand Up @@ -781,7 +793,8 @@ func (h *Executor) Channels(ctx context.Context, cmd *ChannelsRequest) *Channels
}

func toAPIErr(err error) *Error {
if apiErr, ok := err.(*Error); ok {
var apiErr *Error
if errors.As(err, &apiErr) {
return apiErr
}
return ErrorInternal
Expand Down

0 comments on commit b421366

Please sign in to comment.