Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Breaking Datagram API change. #25

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 17 additions & 27 deletions chronos/transports/datagram.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import net, nativesockets, os, deques
import ../asyncloop, ../handles
import common

export OSErrorCode

when defined(windows):
import winlean
else:
Expand All @@ -28,7 +30,9 @@ type
writer: Future[void] # Writer vector completion Future

DatagramCallback* = proc(transp: DatagramTransport,
remote: TransportAddress): Future[void] {.gcsafe.}
message: seq[byte],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

openArray[byte] tends to give more flexibility for the implementation.. a common option later on is to allow the original caller to set up a byte buffer of their choice..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is also the kind of place where Result is appropriate - giving either an error or a buffer but never both - makes for cleaner code everywhere

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arnetheduck we can't use openarray[T] because it can't be captured inside of iterator, so it can't work in async procedures.

remote: TransportAddress,
error: OSErrorCode): Future[void] {.gcsafe.}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like error which can happen here is either not realistic or unrelated to remote-specific message receiving. Thus I would consider removing the error from this function signature. For remote-unrelated events I would introduce another callback like onNetworkChanged.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, but chronos is cross-platform framework, and if Linux has small number of errors, BSD/MacOS and Windows has much more, so i wish to leave error code here.


DatagramTransport* = ref object of RootRef
fd*: AsyncFD # File descriptor
Expand Down Expand Up @@ -138,17 +142,18 @@ when defined(windows):
if bytesCount == 0:
transp.state.incl({ReadEof, ReadPaused})
fromSAddr(addr transp.raddr, transp.ralen, raddr)
transp.buflen = bytesCount
asyncCheck transp.function(transp, raddr)
var msg = transp.buffer
msg.setLen(bytesCount)
asyncCheck transp.function(transp, msg, raddr, OSErrorCode(0))
elif int(err) == ERROR_OPERATION_ABORTED:
# CancelIO() interrupt
transp.state.incl(ReadPaused)
break
else:
transp.setReadError(err)
transp.state.incl(ReadPaused)
transp.buflen = 0
asyncCheck transp.function(transp, raddr)
var msg = newSeq[byte]()
asyncCheck transp.function(transp, msg, raddr, err)
else:
## Initiation
if transp.state * {ReadEof, ReadClosed, ReadError} == {}:
Expand Down Expand Up @@ -177,8 +182,8 @@ when defined(windows):
transp.state.excl(ReadPending)
transp.state.incl(ReadPaused)
transp.setReadError(err)
transp.buflen = 0
asyncCheck transp.function(transp, raddr)
var msg = newSeq[byte]()
asyncCheck transp.function(transp, msg, raddr, err)
break

proc resumeRead(transp: DatagramTransport) {.inline.} =
Expand Down Expand Up @@ -311,16 +316,17 @@ else:
addr transp.ralen)
if res >= 0:
fromSAddr(addr transp.raddr, transp.ralen, raddr)
transp.buflen = res
asyncCheck transp.function(transp, raddr)
var msg = transp.buffer
msg.setLen(res)
asyncCheck transp.function(transp, msg, raddr, OSErrorCode(0))
else:
let err = osLastError()
if int(err) == EINTR:
continue
else:
transp.buflen = 0
transp.setReadError(err)
asyncCheck transp.function(transp, raddr)
var msg = newSeq[byte]()
asyncCheck transp.function(transp, msg, raddr, err)
break

proc writeDatagramLoop(udata: pointer) =
Expand Down Expand Up @@ -658,22 +664,6 @@ proc sendTo*[T](transp: DatagramTransport, remote: TransportAddress,
transp.resumeWrite()
return retFuture

proc peekMessage*(transp: DatagramTransport, msg: var seq[byte],
msglen: var int) =
## Get access to internal message buffer and length of incoming datagram.
if ReadError in transp.state:
raise transp.getError()
shallowCopy(msg, transp.buffer)
msglen = transp.buflen

proc getMessage*(transp: DatagramTransport): seq[byte] =
## Copy data from internal message buffer and return result.
if ReadError in transp.state:
raise transp.getError()
if transp.buflen > 0:
result = newSeq[byte](transp.buflen)
copyMem(addr result[0], addr transp.buffer[0], transp.buflen)

proc getUserData*[T](transp: DatagramTransport): T {.inline.} =
## Obtain user data stored in ``transp`` object.
result = cast[T](transp.udata)
Expand Down
11 changes: 5 additions & 6 deletions tests/testbugs.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ type
test: string

proc udp4DataAvailable(transp: DatagramTransport,
remote: TransportAddress): Future[void] {.async, gcsafe.} =
data: seq[byte],
remote: TransportAddress,
error: OSErrorCode): Future[void] {.async, gcsafe.} =
var udata = getUserData[CustomData](transp)
var expect = TEST_MSG
var data: seq[byte]
var datalen: int
transp.peekMessage(data, datalen)
if udata.test == "CHECK" and datalen == MSG_LEN and
equalMem(addr data[0], addr expect[0], datalen):
if udata.test == "CHECK" and len(data) == MSG_LEN and
equalMem(unsafeAddr data[0], addr expect[0], len(data)):
udata.test = "OK"
transp.close()

Expand Down
159 changes: 58 additions & 101 deletions tests/testdatagram.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,11 @@ const
ClientsCount = 20
MessagesCount = 20

proc client1(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
proc client1(transp: DatagramTransport, pbytes: seq[byte],
raddr: TransportAddress,
error: OSErrorCode): Future[void] {.async.} =
if len(pbytes) > 0:
var data = cast[string](pbytes)
if data.startsWith("REQUEST"):
var numstr = data[7..^1]
var num = parseInt(numstr)
Expand All @@ -36,15 +32,11 @@ proc client1(transp: DatagramTransport,
counterPtr[] = -1
transp.close()

proc client2(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
proc client2(transp: DatagramTransport, pbytes: seq[byte],
raddr: TransportAddress,
error: OSErrorCode): Future[void] {.async.} =
if len(pbytes) > 0:
var data = cast[string](pbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
Expand All @@ -64,15 +56,11 @@ proc client2(transp: DatagramTransport,
counterPtr[] = -1
transp.close()

proc client3(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
proc client3(transp: DatagramTransport, pbytes: seq[byte],
raddr: TransportAddress,
error: OSErrorCode): Future[void] {.async.} =
if len(pbytes) > 0:
var data = cast[string](pbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
Expand All @@ -91,15 +79,11 @@ proc client3(transp: DatagramTransport,
counterPtr[] = -1
transp.close()

proc client4(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
proc client4(transp: DatagramTransport, pbytes: seq[byte],
raddr: TransportAddress,
error: OSErrorCode): Future[void] {.async.} =
if len(pbytes) > 0:
var data = cast[string](pbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
Expand All @@ -118,15 +102,11 @@ proc client4(transp: DatagramTransport,
counterPtr[] = -1
transp.close()

proc client5(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
proc client5(transp: DatagramTransport, pbytes: seq[byte],
raddr: TransportAddress,
error: OSErrorCode): Future[void] {.async.} =
if len(pbytes) > 0:
var data = cast[string](pbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
Expand All @@ -145,15 +125,11 @@ proc client5(transp: DatagramTransport,
counterPtr[] = -1
transp.close()

proc client6(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
proc client6(transp: DatagramTransport, pbytes: seq[byte],
raddr: TransportAddress,
error: OSErrorCode): Future[void] {.async.} =
if len(pbytes) > 0:
var data = cast[string](pbytes)
if data.startsWith("REQUEST"):
var numstr = data[7..^1]
var num = parseInt(numstr)
Expand All @@ -168,15 +144,11 @@ proc client6(transp: DatagramTransport,
counterPtr[] = -1
transp.close()

proc client7(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
proc client7(transp: DatagramTransport, pbytes: seq[byte],
raddr: TransportAddress,
error: OSErrorCode): Future[void] {.async.} =
if len(pbytes) > 0:
var data = cast[string](pbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
Expand All @@ -195,15 +167,11 @@ proc client7(transp: DatagramTransport,
counterPtr[] = -1
transp.close()

proc client8(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
proc client8(transp: DatagramTransport, pbytes: seq[byte],
raddr: TransportAddress,
error: OSErrorCode): Future[void] {.async.} =
if len(pbytes) > 0:
var data = cast[string](pbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
Expand All @@ -222,15 +190,11 @@ proc client8(transp: DatagramTransport,
counterPtr[] = -1
transp.close()

proc client9(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
proc client9(transp: DatagramTransport, pbytes: seq[byte],
raddr: TransportAddress,
error: OSErrorCode): Future[void] {.async.} =
if len(pbytes) > 0:
var data = cast[string](pbytes)
if data.startsWith("REQUEST"):
var numstr = data[7..^1]
var num = parseInt(numstr)
Expand All @@ -249,15 +213,11 @@ proc client9(transp: DatagramTransport,
counterPtr[] = -1
transp.close()

proc client10(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
proc client10(transp: DatagramTransport, pbytes: seq[byte],
raddr: TransportAddress,
error: OSErrorCode): Future[void] {.async.} =
if len(pbytes) > 0:
var data = cast[string](pbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
Expand All @@ -278,15 +238,11 @@ proc client10(transp: DatagramTransport,
counterPtr[] = -1
transp.close()

proc client11(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes: seq[byte]
var nbytes: int
transp.peekMessage(pbytes, nbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
proc client11(transp: DatagramTransport, pbytes: seq[byte],
raddr: TransportAddress,
error: OSErrorCode): Future[void] {.async.} =
if len(pbytes) > 0:
var data = cast[string](pbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
Expand Down Expand Up @@ -434,8 +390,9 @@ proc test3(bounded: bool): Future[int] {.async.} =
proc testConnReset(): Future[bool] {.async.} =
var ta = initTAddress("127.0.0.1:65000")
var counter = 0
proc clientMark(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
proc clientMark(transp: DatagramTransport, pbytes: seq[byte],
raddr: TransportAddress,
error: OSErrorCode): Future[void] {.async.} =
counter = 1
transp.close()
var dgram1 = newDatagramTransport(client1, local = ta)
Expand Down