Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streams example is broken #163

Open
anpin opened this issue Dec 8, 2022 · 4 comments
Open

Streams example is broken #163

anpin opened this issue Dec 8, 2022 · 4 comments

Comments

@anpin
Copy link

anpin commented Dec 8, 2022

Hi, thank you for your effort with this project! I found that streams.fsx example is throwing following exception when I do
dotnet fsi ./streams.fsx

System.InvalidCastException: Unable to cast object of type 'Akka.NotUsed' to type 'Microsoft.FSharp.Core.Unit'.
   at Akka.Streams.Implementation.Module.<>c__DisplayClass28_0`3.<Compose>b__0(Object x, Object y)
   at Akka.Streams.Implementation.MaterializerSession.ResolveMaterialized(IMaterializedValueNode node, IDictionary`2 values, Int32 spaces)
   at Akka.Streams.Implementation.MaterializerSession.ResolveMaterialized(IMaterializedValueNode node, IDictionary`2 values, Int32 spaces)
   at Akka.Streams.Implementation.MaterializerSession.ResolveMaterialized(IMaterializedValueNode node, IDictionary`2 values, Int32 spaces)
   at Akka.Streams.Implementation.MaterializerSession.MaterializeModule(IModule module, Attributes effectiveAttributes)
   at Akka.Streams.Implementation.MaterializerSession.Materialize()
   at Akka.Streams.Implementation.ActorMaterializerImpl.Materialize[TMat](IGraph`2 runnable, Func`2 subFlowFuser, Attributes initialAttributes)
   at Akka.Streams.Implementation.ActorMaterializerImpl.Materialize[TMat](IGraph`2 runnable)
   at Akka.Streams.Dsl.RunnableGraph`1.Run(IMaterializer materializer)
   at FSI_0002.it@105.Invoke(Unit unitVar) in /home/a/Akkling/examples/streams.fsx:line 105
   at Microsoft.FSharp.Control.AsyncPrimitives.CallThenInvoke[T,TResult](AsyncActivation`1 ctxt, TResult result1, FSharpFunc`2 part2) in D:\a\_work\1\s\src\FSharp.Core\async.fs:line 517
   at Microsoft.FSharp.Control.Trampoline.Execute(FSharpFunc`2 firstAction) in D:\a\_work\1\s\src\FSharp.Core\async.fs:line 112
--- End of stack trace from previous location ---
   at Microsoft.FSharp.Control.AsyncResult`1.Commit() in D:\a\_work\1\s\src\FSharp.Core\async.fs:line 454
   at Microsoft.FSharp.Control.AsyncPrimitives.QueueAsyncAndWaitForResultSynchronously[a](CancellationToken token, FSharpAsync`1 computation, FSharpOption`1 timeout) in D:\a\_work\1\s\src\FSharp.Core\async.fs:line 1140
   at Microsoft.FSharp.Control.FSharpAsync.RunSynchronously[T](FSharpAsync`1 computation, FSharpOption`1 timeout, FSharpOption`1 cancellationToken) in D:\a\_work\1\s\src\FSharp.Core\async.fs:line 1511
   at <StartupCode$FSI_0002>.$FSI_0002.main@() in /home/a/Akkling/examples/streams.fsx:line 104
   at System.RuntimeMethodHandle.InvokeMethod(Object target, Void** arguments, Signature sig, Boolean isConstructor)
   at System.Reflection.MethodInvoker.Invoke(Object obj, IntPtr* args, BindingFlags invokeAttr)
@anpin
Copy link
Author

anpin commented Dec 8, 2022

parser ported from the docs returns different signature and works

let replParser =
    Flow.Create<string>().TakeWhile(fun c -> c <> "q")
        .Concat(Source.Single("BYE"))
        .Select(fun elem -> ByteString.FromString($"{elem}\n"));

let repl = Flow.Create<ByteString>()
                .Via(Framing.Delimiter(
                    ByteString.FromString("\n"),
                    maximumFrameLength = 256,
                    allowTruncation = true))
                .Select(fun c -> c.ToString())
                .Select(fun text ->
                            printf $"Server: {text}"
                            text
                )
                .Select(fun text -> Console.ReadLine())
                .Via(replParser);

@ingted
Copy link

ingted commented Jan 2, 2023

The workaround is:

module Source =
    let singleton s = 
        Source.Single(s).MapMaterializedValue(Func<Akka.NotUsed,Akka.NotUsed>(fun _ -> Akka.NotUsed.Instance))    
        

@ingted
Copy link

ingted commented Jan 2, 2023

Process A:

image

Process B:
image

@ingted
Copy link

ingted commented Jan 2, 2023

my fork: (master)
https://github.com/ingted/Akkling

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants