Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

=> operator does not work in place of Via #79

Open
vasily-kirichenko opened this issue Dec 20, 2017 · 7 comments
Open

=> operator does not work in place of Via #79

vasily-kirichenko opened this issue Dec 20, 2017 · 7 comments

Comments

@vasily-kirichenko
Copy link
Contributor

open Akka
open Akka.Actor
open Akka.Streams
open Akka.Streams.Dsl
open Akkling
open Akkling.Streams
open Akkling.Streams.Operators

type IUser = interface end
type IHashtagEntity = interface end

type ITweet = 
    abstract Hashtags: IHashtagEntity list
    abstract CreatedBy: IUser

[<EntryPoint>]
let main _ = 
    use system = ActorSystem.Create("test")
    let mat = system.Materializer()
    
    GraphDsl.Create (fun b ->
        let broadcast = b.Add(Broadcast<ITweet>(2))
        

        b.From(broadcast.Out(0)).Via(Flow.id |> Flow.map (fun (tweet: ITweet) -> tweet.CreatedBy)) |> ignore
        b.From(broadcast.Out(0)) => (Flow.id |> Flow.map (fun (tweet: ITweet) -> tweet.CreatedBy)) |> ignore

        
        ClosedShape.Instance
    ) 
    |> RunnableGraph.FromGraph
    |> Graph.run mat
    |> ignore
    0

image

image

I expect Flow<ITweet, IUser, NotUsed> here instead of Flow<ITweet, ITweet, NotUsed>.

@Horusiath
Copy link
Owner

I'm not sure right now, but I guess, that the current definition of => may have been not precise enough, and F# type inference gone to the wrong conclusion. So far I still struggle to make Graph DSL F#-friendly.

Unfortunately original Akka GraphDSL uses almost all scala-specific features in one place (implicit params, implicit type casting, custom operators with multiple overloads etc.). It's hard to create something remotely similar in any other language.

@vasily-kirichenko
Copy link
Contributor Author

I see. So there is no way to make it work with => operator? Maybe add a type hint somewhere?

@Horusiath
Copy link
Owner

Yes. I usually try to keep public API method signatures with all types definitions set explicitly to avoid such mistakes as this one.

@ingted
Copy link

ingted commented Dec 29, 2020

let inline (=>) (fo:GraphDsl.ForwardOps< ^T, ^S>) (junction:Flow< ^T, ^V, NotUsed> ) =
    fo.Via junction

Hi @vasily-kirichenko, does this work for you? (I know it is ugly...)

And if I execute code like this

#r "nuget: Akka.Serialization.Hyperion"
#r "nuget: Akka.Cluster"
#r "nuget: Akka.Remote"
#r "nuget: Akka.Streams"
#r "nuget: Akka.Cluster.Tools"
#r "nuget: Akkling"
#r "nuget: Akkling.Streams"
#r "nuget: Akkling.Cluster.Sharding"
#r "nuget: Microsoft.Extensions.Logging"
#r "nuget: Microsoft.Extensions.Logging.Abstractions"
#r "nuget: System.Configuration.ConfigurationManager"

open Akka
open Akka.Actor
open Akka.Streams
open Akka.Streams.Dsl
open Akkling
open Akkling.Streams
open Akkling.Streams.Operators

type IUser = interface end
type IHashtagEntity = interface end

type ITweet = 
    abstract Hashtags: IHashtagEntity list
    abstract CreatedBy: IUser

let system = ActorSystem.Create("test")
let mat = system.Materializer()

GraphDsl.Create (fun b ->
    let broadcast = b.Add(Broadcast<ITweet>(2))
    b.From(broadcast.Out(0)).Via(Flow.id |> Flow.map (fun (tweet: ITweet) -> tweet.CreatedBy)) |> ignore
    
    ClosedShape.Instance
) 
|> RunnableGraph.FromGraph
|> Graph.run mat
|> ignore

It throws exception...

System.ArgumentException: CombinedModule requires shape with same ports to replace
Parameter name: shape
   at Akka.Streams.Implementation.CompositeModule.ReplaceShape(Shape shape)
   at Akka.Streams.Dsl.GraphDsl.CreateMaterialized[TShape,TMat](Func`2 buildBlock)
   at Akka.Streams.Dsl.GraphDsl.Create[TShape](Func`2 buildBlock)
   at <StartupCode$FSI_0006>.$FSI_0006.main@()
Stopped due to error

Excuse me, do you know how to fix it?

@Horusiath
Copy link
Owner

@ingted in your code you defined Broadcast<ITweet>(2) which means, that broadcast has two outputs, but in a next line you only connect one. In Akka.Streams, unless you use dynamic stages, all inputs/outputs have to be connected when creating a graph.

@ingted
Copy link

ingted commented Jan 11, 2021

image

I changed it to 1, and the exception still there.

I am sorry that I think I am not that familiar with Akka Stream.
I will study more, debug this and come to discuss after then. ^^

Thank you!

@Horusiath
Copy link
Owner

Horusiath commented Jan 13, 2021

  1. If you have Broadcast(1), then you don't need broadcast at all.

  2. Think of stages as building blocks with inputs and outputs. Graph.create requies you to either join all of them together (so that there are no dangling in-/outputs), or expose them outside:

    • In your case your Flow.id |> Flow.map (..) |> ignore is not closed, as output of Flow.map is still dangling around (just ignored). After Via call, it needs to be connected i.e. to Sink.ignore stage (if you don't want to do anything with it).
    • You defined broadcast and connected its only output, but still left input disconnected from the rest of the graph. You may want to decide to expose it outside by using SinkShape<_>(broadcast.In), which will produce a sink. You can then connect that sink with the source of your choice to materialize it into fully functioning graph.

You may also want to take a look at the example of using Akkling graph API.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants