Skip to content

Commit

Permalink
Merge pull request #22 from Horusiath/feature/akkling-io
Browse files Browse the repository at this point in the history
Added support for Akka.IO API
  • Loading branch information
Horusiath committed Dec 17, 2015
2 parents 5ea1490 + 889884c commit 36519fd
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 7 deletions.
1 change: 1 addition & 0 deletions Akkling.sln
Expand Up @@ -42,6 +42,7 @@ EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "examples", "examples", "{8CC185AC-1D97-41F9-BBF1-09D892CA18B9}"
ProjectSection(SolutionItems) = preProject
examples\basic.fsx = examples\basic.fsx
examples\io.fsx = examples\io.fsx
examples\lifecycle.fsx = examples\lifecycle.fsx
examples\persistence.fsx = examples\persistence.fsx
examples\remote.fsx = examples\remote.fsx
Expand Down
42 changes: 42 additions & 0 deletions examples/io.fsx
@@ -0,0 +1,42 @@
#r "../src/Akkling/bin/Debug/Akka.dll"
#r "../src/Akkling/bin/Debug/Wire.dll"
#r "../src/Akkling/bin/Debug/Newtonsoft.Json.dll"
#r "../src/Akkling/bin/Debug/FSharp.PowerPack.dll"
#r "../src/Akkling/bin/Debug/FSharp.PowerPack.Linq.dll"
#r "../src/Akkling/bin/Debug/Akkling.dll"

open System
open Akkling
open Akkling.IO
open Akkling.IO.Tcp
open System.Net

let system = System.create "telnet-sys" <| Configuration.defaultConfig()

let handler connection = fun (ctx: Actor<obj>) ->
monitor ctx connection |> ignore
let rec loop () = actor {
let! msg = ctx.Receive ()
match msg with
| Received(data) ->
printfn "%s" (data.DecodeString())
return! loop ()
| Terminated(_, _,_) | ConnectionClosed(_) -> return Stop
| _ -> return Unhandled
}
loop ()

let endpoint = IPEndPoint(IPAddress.Loopback, 5000)
let listener = spawn system "listener" <| fun m ->
IO.Tcp(m) <! TcpMessage.Bind(m.Self, endpoint, 100)
let rec loop () = actor {
let! (msg: obj) = m.Receive ()
match msg with
| Connected(remote, local) ->
let conn = m.Sender ()
conn <! TcpMessage.Register(spawn m null (handler conn))
return! loop ()
| _ -> return Unhandled
}
loop ()

4 changes: 2 additions & 2 deletions examples/persistence.fsx
Expand Up @@ -38,8 +38,8 @@ let counter =
| GetState ->
mailbox.Sender() <! state
return! loop state
| Inc -> return Persist [ Event { Delta = 1 } ]
| Dec -> return Persist [ Event { Delta = -1 } ]
| Inc -> return Persist (Event { Delta = 1 })
| Dec -> return Persist (Event { Delta = -1 })
}
loop 0

Expand Down
13 changes: 9 additions & 4 deletions src/Akkling.Persistence/PersistentActor.fs
Expand Up @@ -83,16 +83,20 @@ and [<Interface>]ExtEventsourced<'Message> =
inherit ExtActor<'Message>

and PersistentEffect<'Message> =
| Persist of 'Message seq
| PersistAsync of 'Message seq
| Persist of 'Message
| PersistAll of 'Message seq
| PersistAsync of 'Message
| PersistAllAsync of 'Message seq
| Defer of 'Message seq
interface Effect with
member this.OnApplied(context, message) =
match context with
| :? ExtEventsourced<'Message> as persistentContext ->
match this with
| Persist(events) -> persistentContext.PersistEvent events
| PersistAsync(events) -> persistentContext.AsyncPersistEvent events
| Persist(event) -> persistentContext.PersistEvent [event]
| PersistAll(events) -> persistentContext.PersistEvent events
| PersistAsync(event) -> persistentContext.AsyncPersistEvent [event]
| PersistAllAsync(events) -> persistentContext.AsyncPersistEvent events
| Defer(events) -> persistentContext.DeferEvent events
| _ -> raise (Exception("Cannot use persistent effects in context of non-persistent actor"))

Expand All @@ -116,6 +120,7 @@ and TypedPersistentContext<'Message, 'Actor when 'Actor :> FunPersistentActor<'M
member __.Receive() = Input
member __.Self = typed self
member __.Sender<'Response>() = typed (context.Sender) :> IActorRef<'Response>
member __.Parent<'Other>() = typed (context.Parent) :> IActorRef<'Other>
member __.System = context.System
member __.ActorOf(props, name) = context.ActorOf(props, name)
member __.ActorSelection(path : string) = context.ActorSelection(path)
Expand Down
1 change: 1 addition & 0 deletions src/Akkling.Persistence/PersistentView.fs
Expand Up @@ -57,6 +57,7 @@ and TypedViewContext<'Message, 'Actor when 'Actor :> FunPersistentView<'Message>
member __.Receive() = Input
member __.Self = typed self
member __.Sender<'Response>() = typed (context.Sender) :> IActorRef<'Response>
member __.Parent<'Other>() = typed (context.Parent) :> IActorRef<'Other>
member __.System = context.System
member __.ActorOf(props, name) = context.ActorOf(props, name)
member __.ActorSelection(path : string) = context.ActorSelection(path)
Expand Down
6 changes: 6 additions & 0 deletions src/Akkling/Actors.fs
Expand Up @@ -44,6 +44,11 @@ type Actor<'Message> =
/// </summary>
abstract Sender<'Response> : unit -> IActorRef<'Response>

/// <summary>
/// Returns a parrent of current actor.
/// </summary>
abstract Parent<'Other> : unit -> IActorRef<'Other>

/// <summary>
/// Lazy logging adapter. It won't be initialized until logging function will be called.
/// </summary>
Expand Down Expand Up @@ -113,6 +118,7 @@ type TypedContext<'Message, 'Actor when 'Actor :> ActorBase and 'Actor :> IWithU
member __.Receive() = Input
member __.Self = typed self
member __.Sender<'Response>() = typed (context.Sender) :> IActorRef<'Response>
member __.Parent<'Other>() = typed (context.Parent) :> IActorRef<'Other>
member __.System = context.System
member __.ActorOf(props, name) = context.ActorOf(props, name)
member __.ActorSelection(path : string) = context.ActorSelection(path)
Expand Down
1 change: 1 addition & 0 deletions src/Akkling/Akkling.fsproj
Expand Up @@ -74,6 +74,7 @@
<Compile Include="Logging.fs" />
<Compile Include="Utils.fs" />
<Compile Include="Extensions.fs" />
<Compile Include="IO.fs" />
<None Include="paket.references" />
<None Include="Akkling.fsproj.paket.template" />
<None Include="app.config.install.xdt" />
Expand Down
79 changes: 79 additions & 0 deletions src/Akkling/IO.fs
@@ -0,0 +1,79 @@
//-----------------------------------------------------------------------
// <copyright file="IO.fs" company="Akka.NET Project">
// Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
// Copyright (C) 2013-2015 Akka.NET project <https://github.com/akkadotnet/akka.net>
// Copyright (C) 2015 Bartosz Sypytkowski <gttps://github.com/Horusiath>
// </copyright>
//-----------------------------------------------------------------------

namespace Akkling

module IO =

open Akka.IO
open System.Net

/// <summary>
/// Gets TCP manager for current actor.
/// </summary>
let Tcp(context: Actor<'Message>) : IActorRef<Akka.IO.Tcp.Command> =
typed (Akka.IO.Tcp.Manager(context.System))

/// <summary>
/// Gets UDP manager for current actor.
/// </summary>
let Udp(context: Actor<'Message>) : IActorRef<Akka.IO.Udp.Command> =
typed (Akka.IO.Udp.Manager(context.System))

type TcpMessage = Akka.IO.TcpMessage

module Tcp =

let (|Received|_|) (msg:obj) : ByteString option =
match msg with
| :? Tcp.Received as r -> Some (r.Data)
| _ -> None

let (|Connected|_|) (msg:obj) : (EndPoint * EndPoint) option =
match msg with
| :? Tcp.Connected as c -> Some (c.RemoteAddress, c.LocalAddress)
| _ -> None

let (|CommandFailed|_|) (msg:obj) : #Tcp.Command option =
match msg with
| :? Tcp.CommandFailed as c ->
if c.Cmd :? #Tcp.Command
then Some (c.Cmd :?> #Tcp.Command)
else None
| _ -> None

let (|ConnectionClosed|_|) (msg:obj) =
match msg with
| :? Tcp.ConnectionClosed as closed -> Some closed
| _ -> None

let (|Closed|Aborted|ConfirmedClosed|PeerClosed|ErrorClosed|) (msg:Tcp.ConnectionClosed) =
match msg with
| :? Tcp.Closed -> Closed
| :? Tcp.Aborted -> Aborted
| :? Tcp.ConfirmedClosed -> ConfirmedClosed
| :? Tcp.PeerClosed -> PeerClosed
| :? Tcp.ErrorClosed -> ErrorClosed

module Udp =

let inline Bind(ref: IActorRef<'t>, localAddress: EndPoint) =
Akka.IO.Udp.Bind(ref, localAddress) :> Udp.Command

let (|Received|_|) (msg:obj) : ByteString option =
match msg with
| :? Udp.Received as r -> Some (r.Data)
| _ -> None

let (|CommandFailed|_|) (msg:obj) : 'C option =
match msg with
| :? Udp.CommandFailed as c ->
if c.Cmd :? 'C
then Some (c.Cmd :?> 'C)
else None
| _ -> None
6 changes: 5 additions & 1 deletion src/Akkling/MessagePatterns.fs
Expand Up @@ -19,7 +19,11 @@ let (|Terminated|_|) (msg: obj) : (IActorRef<'T> * bool * bool) option =
| :? Terminated as t -> Some((typed t.ActorRef, t.ExistenceConfirmed, t.AddressTerminated))
| _ -> None


/// <summary>
/// Active pattern that matches message agains <see cref="ActorIdentity"/> message.
/// This is the result of <see cref="Identify"/> request send with matching correlation id.
/// Response contains actor ref of the requested identity or None if no actor was found.
/// </summary>
let (|ActorIdentity|_|) (msg: obj) : ('CorrelationId * IActorRef<'T> option) option =
match msg with
| :? ActorIdentity as identity ->
Expand Down

0 comments on commit 36519fd

Please sign in to comment.