Skip to content

Commit

Permalink
Merge pull request #16172 from shoshanatech/socket-release-all-2
Browse files Browse the repository at this point in the history
Release waiting processes when destroying a socket (without ThreadedFFI dependency)
  • Loading branch information
MarcusDenker committed Apr 25, 2024
2 parents 9e40a62 + 82d4d7b commit c754edc
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 76 deletions.
155 changes: 99 additions & 56 deletions src/Network-Kernel/Socket.class.st
Expand Up @@ -465,11 +465,18 @@ Socket >> connectTo: hostAddress port: port [
Socket >> connectTo: hostAddress port: port waitForConnectionFor: timeout [
"Initiate a connection to the given port at the given host
address. Waits until the connection is established or time outs."

self connectNonBlockingTo: hostAddress port: port.
self
waitForConnectionFor: timeout
ifTimedOut: [ConnectionTimedOut signal: 'Cannot connect to '
, (NetNameResolver stringFromAddress: hostAddress) , ':' , port asString]
ifClosed: [
ConnectionClosed signal: 'Connection aborted to '
, (NetNameResolver stringFromAddress: hostAddress) , ':'
, port asString ]
ifTimedOut: [
ConnectionTimedOut signal: 'Cannot connect to '
, (NetNameResolver stringFromAddress: hostAddress) , ':'
, port asString ]
]

{ #category : 'connection open/close' }
Expand All @@ -489,18 +496,30 @@ Socket >> dataAvailable [

{ #category : 'initialize - destroy' }
Socket >> destroy [
"Destroy this socket. Its connection, if any, is aborted and its resources are freed. Do nothing if the socket has already been destroyed (i.e., if its socketHandle is nil)."
"Destroy this socket. Its connection, if any, is aborted and its resources are freed.
Any processes waiting on the socket are freed immediately, but it is up to them to
recognize that the socket has been destroyed.
Do nothing if the socket has already been destroyed (i.e., if its socketHandle is nil)."

socketHandle
ifNotNil: [
self isValid
ifTrue: [ self primSocketDestroy: socketHandle ].
Smalltalk unregisterExternalObject: semaphore.
Smalltalk unregisterExternalObject: readSemaphore.
Smalltalk unregisterExternalObject: writeSemaphore.
socketHandle := nil.
readSemaphore := writeSemaphore := semaphore := nil.
self unregister ]
socketHandle ifNotNil: [
| saveSemaphores |
self isValid ifTrue: [ self primSocketDestroy: socketHandle ].
socketHandle := nil.
Smalltalk unregisterExternalObject: semaphore.
Smalltalk unregisterExternalObject: readSemaphore.
Smalltalk unregisterExternalObject: writeSemaphore.
"Stash the semaphores and nil them before signaling to make sure
no caller gets a chance to wait on them again and block forever."
saveSemaphores := {
semaphore.
readSemaphore.
writeSemaphore }.
semaphore := readSemaphore := writeSemaphore := nil.
"A single #signal should be sufficient, as multiple processes trying to
read or write at once will result in undefined behavior anyway as their
data gets all mixed up together."
saveSemaphores do: [ :each | each signal ].
self unregister ]
]

{ #category : 'receiving' }
Expand Down Expand Up @@ -1334,6 +1353,7 @@ Socket >> retryIfWaitingForConnection: aBlock [
ifTrue: [
self
waitForConnectionFor: Socket standardTimeout
ifClosed: nil
ifTimedOut: nil.
aBlock value ]
ifFalse: [ e pass ] ]
Expand Down Expand Up @@ -1529,15 +1549,19 @@ Socket >> setPort: port [

{ #category : 'queries' }
Socket >> socketError [
^self primSocketError: socketHandle

^ socketHandle ifNotNil: [ self primSocketError: socketHandle ]
]

{ #category : 'queries' }
Socket >> socketErrorMessage [

^ [ OSPlatform current getErrorMessage: self socketError ]
on: Error
do: [ 'Error code: ' , self socketError printString ]
^ self socketError
ifNil: [ 'Socket destroyed, cannot retrieve error message' ]
ifNotNil: [ :err |
[ OSPlatform current getErrorMessage: err ]
on: Error
do: [ 'Error code: ' , err printString ] ]
]

{ #category : 'accessing' }
Expand Down Expand Up @@ -1569,40 +1593,57 @@ Socket >> unregister [
{ #category : 'waiting' }
Socket >> waitForAcceptFor: timeout [
"Wait and accept an incoming connection. Return nil if it fails"
^ self waitForAcceptFor: timeout ifTimedOut: [nil]

^ self waitForAcceptFor: timeout ifClosed: nil ifTimedOut: nil
]

{ #category : 'waiting' }
Socket >> waitForAcceptFor: timeout ifTimedOut: timeoutBlock [
Socket >> waitForAcceptFor: timeout ifClosed: closedBlock ifTimedOut: timeoutBlock [
"Wait and accept an incoming connection"
self waitForConnectionFor: timeout ifTimedOut: [^timeoutBlock value].
^self accept

self
waitForConnectionFor: timeout
ifClosed: [ ^ closedBlock value ]
ifTimedOut: [ ^ timeoutBlock value ].
^ self accept
]

{ #category : 'waiting' }
Socket >> waitForConnectionFor: timeout [
"Wait up until the given deadline for a connection to be established. Return true if it is established by the deadline, false if not."

^self
waitForConnectionFor: timeout
ifTimedOut: [ConnectionTimedOut signal: 'Failed to connect in ', timeout asString, ' seconds']
^ self
waitForConnectionFor: timeout
ifClosed: [
ConnectionClosed signal: (socketHandle
ifNil: [ 'Socket destroyed while connecting' ]
ifNotNil: [
'Connection aborted or failed: ' , self socketErrorMessage ]) ]
ifTimedOut: [
ConnectionTimedOut signal:
'Failed to connect in ' , timeout asString , ' seconds' ]
]

{ #category : 'waiting' }
Socket >> waitForConnectionFor: timeout ifTimedOut: timeoutBlock [
"Wait up until the given deadline for a connection to be established. Return true if it is established by the deadline, false if not."
Socket >> waitForConnectionFor: timeout ifClosed: closedBlock ifTimedOut: timeoutBlock [
"Wait up until the given deadline for a connection to be established.
Evaluate closedBlock if the connection is closed locally,
or timeoutBlock if the deadline expires.
We should separately detect the case of a connection being refused here as well."

| startTime msecsDelta msecsEllapsed status |
| startTime msecsDelta msecsElapsed status |
startTime := Time millisecondClockValue.
msecsDelta := (timeout * 1000) truncated.

[
status := self primSocketConnectionStatus: socketHandle.
[(status = WaitingForConnection) and: [(msecsEllapsed := Time millisecondsSince: startTime) < msecsDelta]]
whileTrue: [
semaphore waitTimeoutMilliseconds: msecsDelta - msecsEllapsed.
status := self primSocketConnectionStatus: socketHandle].
status == WaitingForConnection and: [
(msecsElapsed := Time millisecondsSince: startTime) < msecsDelta ] ]
whileTrue: [ semaphore waitTimeoutMilliseconds: msecsDelta - msecsElapsed ].

status = Connected ifFalse: [^timeoutBlock value].
^ true
status == WaitingForConnection ifTrue: [ ^ timeoutBlock value ].
status == Connected ifFalse: [ ^ closedBlock value ]
]

{ #category : 'waiting' }
Expand All @@ -1628,7 +1669,9 @@ Socket >> waitForDataFor: timeout [

{ #category : 'waiting' }
Socket >> waitForDataFor: timeout ifClosed: closedBlock ifTimedOut: timedOutBlock [
"Wait for the given nr of seconds for data to arrive."
"Wait for the given nr of seconds for data to arrive.
If it does not, execute <timedOutBlock>. If the connection
is closed before any data arrives, execute <closedBlock>."

| startTime msecsDelta msecsElapsed |
startTime := Time millisecondClockValue.
Expand All @@ -1637,21 +1680,17 @@ Socket >> waitForDataFor: timeout ifClosed: closedBlock ifTimedOut: timedOutBloc
self isConnected ifFalse: [ ^ closedBlock value ].
(msecsElapsed := Time millisecondsSince: startTime) < msecsDelta
ifFalse: [ ^ timedOutBlock value ].
self readSemaphore waitTimeoutMilliseconds: msecsDelta - msecsElapsed ]
readSemaphore waitTimeoutMilliseconds: msecsDelta - msecsElapsed ]
]

{ #category : 'waiting' }
Socket >> waitForDataIfClosed: closedBlock [
"Wait indefinitely for data to arrive. This method will block until
data is available or the socket is closed."

[true]
whileTrue: [
self dataAvailable
ifTrue: [^self].
self isConnected
ifFalse: [^closedBlock value].
self readSemaphore wait]
[ self dataAvailable ] whileFalse: [
self isConnected ifFalse: [ ^ closedBlock value ].
readSemaphore wait ]
]

{ #category : 'waiting' }
Expand All @@ -1662,22 +1701,23 @@ Socket >> waitForDisconnectionFor: timeout [
(e.g., because he has called 'close' to send a close request to the other end)
before calling this method."

| startTime msecsDelta status |
| startTime msecsDelta msecsElapsed status |
startTime := Time millisecondClockValue.
msecsDelta := (timeout * 1000) truncated.

[
status := self primSocketConnectionStatus: socketHandle.
[((status == Connected) or: [(status == ThisEndClosed)]) and:
[(Time millisecondsSince: startTime) < msecsDelta]] whileTrue: [
self discardReceivedData.
self readSemaphore waitTimeoutMilliseconds:
(msecsDelta - (Time millisecondsSince: startTime) max: 0).
status := self primSocketConnectionStatus: socketHandle].
(status == Connected or: [ status == ThisEndClosed ]) and: [
(msecsElapsed := Time millisecondsSince: startTime) < msecsDelta ] ]
whileTrue: [
self discardReceivedData.
readSemaphore waitTimeoutMilliseconds: msecsDelta - msecsElapsed ].
^ status ~= Connected
]

{ #category : 'waiting' }
Socket >> waitForSendDoneFor: timeout [
"Wait up until the given deadline for the current send operation to complete.
"Wait up until the given deadline for the current send operation to complete.
Raise an exception if the timeout expires or the connection is closed before sending finishes."

^ self
Expand All @@ -1691,18 +1731,21 @@ Socket >> waitForSendDoneFor: timeout ifClosed: closedBlock ifTimedOut: timedOut
"Wait up until the given deadline for the current send operation to complete.
If it does not, execute <timedOutBlock>. If the connection is closed before
the send completes, execute <closedBlock>."
| startTime msecsDelta msecsElapsed sendDone |


| startTime msecsDelta msecsElapsed |
startTime := Time millisecondClockValue.
msecsDelta := (timeout * 1000) truncated.
"Connection end and final data can happen fast, so test in this order"
[ sendDone := self sendDone ] whileFalse: [
[ self sendDone ] whileFalse: [
self isConnected ifFalse: [ ^ closedBlock value ].
(msecsElapsed := Time millisecondsSince: startTime) < msecsDelta
ifFalse: [ ^ timedOutBlock value ].
self writeSemaphore waitTimeoutMilliseconds: msecsDelta - msecsElapsed ].

^ sendDone
writeSemaphore waitTimeoutMilliseconds: msecsDelta - msecsElapsed ].

"For backward compatibility with Pharo <= 11, return a boolean indicating
whether the send is completed. The loop will only terminate when this
is the case, so simply return true."
^ true
]

{ #category : 'accessing' }
Expand Down
57 changes: 37 additions & 20 deletions src/Network-Tests/SocketStreamTest.class.st
Expand Up @@ -19,6 +19,23 @@ SocketStreamTest >> classToBeTested [
^ SocketStream
]

{ #category : 'stream protocol' }
SocketStreamTest >> closeServerAndSendOnceFromClient [

serverStream close.
"The first send after the other end is closed will not have problems as long as it fits within the send buffer."
self
shouldnt: [
clientStream
nextPutAll: 'A line of text';
flush ]
raise: NetworkError.
"Wait for the state of the underlying socket to update--normally this happens near-instantly,
but when running tests in a batch of other sockets-related tests, e.g. in CI, often a subsequent send
will still succeed without the wait, causing the test to fail."
(Delay forMilliseconds: 100) wait
]

{ #category : 'running' }
SocketStreamTest >> setUp [
| listener clientSocket serverSocket |
Expand Down Expand Up @@ -48,13 +65,25 @@ SocketStreamTest >> tearDown [

{ #category : 'stream protocol' }
SocketStreamTest >> testFlushLargeMessageOtherEndClosed [
"A large send will be broken into chunks and fail on the second one."

"A large send should be broken into chunks and fail once the TCP send buffer
is full. Ensure we actually exceed the size of the send buffer, but also try
to reduce it first so we don't need an excessively large message. Some platforms
(Linux) have minimum values for this setting that prevent us from relying on
lowering it alone. Attempt to set a 65KiB buffer and double whatever we get."

| bufferSize |
"Windows seems to accept arbitrarily large send() payloads regardless of the
send buffer size, so we can't force a wait other than by calling #flush twice,
which would defeat the whole purpose of the test.
Skip on Windows for now as this is not a regression."
OSPlatform current isWindows ifTrue: [ self skip ].
clientStream socket setOption: 'SO_SNDBUF' value: 2 ** 16.
bufferSize := (clientStream socket getOption: 'SO_SNDBUF') second.
serverStream close.
self
should: [ "256kiB"
should: [
clientStream
nextPutAll: (String new: 2 ** 18 withAll: $a);
nextPutAll: (String new: bufferSize * 2 withAll: $a);
flush ]
raise: ConnectionClosed
]
Expand All @@ -63,15 +92,7 @@ SocketStreamTest >> testFlushLargeMessageOtherEndClosed [
SocketStreamTest >> testFlushOtherEndClosed [
"Ensure that #flush properly raises an error when called when the other end is closed."

serverStream close.
"The first send after the other end is closed will not have problems as long as it fits within the send buffer."
self
shouldnt: [
clientStream
nextPutAll: 'A line of text';
flush ]
raise: NetworkError.
"The next send will wait for the first to finish and discover the error condition."
self closeServerAndSendOnceFromClient.
self
should: [
clientStream
Expand Down Expand Up @@ -105,15 +126,11 @@ SocketStreamTest >> testNextIntoCloseNonSignaling [

{ #category : 'stream protocol' }
SocketStreamTest >> testNextPutAllFlushOtherEndClosed [
"#nextPutAllFlush: does its own error handling, so needs to be tested separately.
Having some content in the buffer first means that the direct send of the second buffer will encounter an error."
"#nextPutAllFlush: does its own error handling, so needs to be tested separately."

serverStream close.
self closeServerAndSendOnceFromClient.
self
should: [
clientStream
nextPutAll: 'A line of text';
nextPutAllFlush: 'Another line of text' ]
should: [ clientStream nextPutAllFlush: 'Another line of text' ]
raise: ConnectionClosed
]

Expand Down

0 comments on commit c754edc

Please sign in to comment.