Skip to content

Commit

Permalink
fix: panics when calling BlockSlice (#128)
Browse files Browse the repository at this point in the history
  • Loading branch information
jjeffcaii committed Sep 15, 2022
1 parent 7d6e9e0 commit d7a49c4
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 8 deletions.
5 changes: 3 additions & 2 deletions rx/flux/flux.go
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/jjeffcaii/reactor-go/flux"
"github.com/jjeffcaii/reactor-go/scheduler"

"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx"
)
Expand All @@ -22,10 +23,10 @@ type Sink interface {
Error(e error)
}

// Flux represents represents a reactive sequence of 0..N items.
// Flux represents a reactive sequence of 0..N items.
type Flux interface {
rx.Publisher
// Take take only the first N values from this Flux, if available.
// Take takes only the first N values from this Flux, if available.
Take(n int) Flux
// Filter evaluate each source value against the given Predicate.
// If the predicate test succeeds, the value is emitted.
Expand Down
24 changes: 18 additions & 6 deletions rx/flux/proxy.go
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/jjeffcaii/reactor-go"
"github.com/jjeffcaii/reactor-go/flux"
"github.com/jjeffcaii/reactor-go/scheduler"

"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx"
)
Expand Down Expand Up @@ -76,12 +77,19 @@ func (p proxy) ToChan(ctx context.Context, cap int) (<-chan payload.Payload, <-c
err <- reactor.ErrSubscribeCancelled
}
}).
Map(func(any reactor.Any) (reactor.Any, error) {
return payload.Clone(any.(payload.Payload)), nil
}).
SubscribeWithChan(ctx, ch, err)
return ch, err
}

func (p proxy) BlockFirst(ctx context.Context) (first payload.Payload, err error) {
v, err := p.Flux.BlockFirst(ctx)
v, err := p.Flux.
Map(func(any reactor.Any) (reactor.Any, error) {
return payload.Clone(any.(payload.Payload)), nil
}).
BlockFirst(ctx)
if err != nil {
return
}
Expand All @@ -92,14 +100,18 @@ func (p proxy) BlockFirst(ctx context.Context) (first payload.Payload, err error
}

func (p proxy) BlockLast(ctx context.Context) (last payload.Payload, err error) {
v, err := p.Flux.BlockLast(ctx)
v, err := p.Flux.
Map(func(any reactor.Any) (reactor.Any, error) {
return payload.Clone(any.(payload.Payload)), nil
}).
BlockLast(ctx)
if err != nil {
return
}
if v == nil {
return
if v != nil {
last = v.(payload.Payload)
}
last = v.(payload.Payload)

return
}

Expand All @@ -119,7 +131,7 @@ func (p proxy) BlockSlice(ctx context.Context) (results []payload.Payload, err e
Subscribe(
ctx,
reactor.OnNext(func(v reactor.Any) error {
results = append(results, v.(payload.Payload))
results = append(results, payload.Clone(v.(payload.Payload)))
return nil
}),
reactor.OnError(func(e error) {
Expand Down

0 comments on commit d7a49c4

Please sign in to comment.