Skip to content

Commit

Permalink
Merge branch 'libssh' into raw
Browse files Browse the repository at this point in the history
  • Loading branch information
Carlos Cabanero committed Jan 15, 2024
2 parents 7a3649d + 7e2e2fd commit 0c7ece2
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 7 deletions.
28 changes: 22 additions & 6 deletions SSH/SFTP.swift
Expand Up @@ -505,12 +505,16 @@ extension SFTPFile: BlinkFiles.Reader, BlinkFiles.WriterTo {
receiveRequest: receiveRequest(_:),
on: rloop)
.flatMap(maxPublishers: .max(1)) { data -> AnyPublisher<Int, Error> in
self.log.message("WRITING \(data.count)", SSH_LOG_DEBUG)
return w.write(data, max: data.count)
}.eraseToAnyPublisher()
}
.print()
.eraseToAnyPublisher()
}

private func receiveRequest(_ req: Subscribers.Demand) {
self.demand = req
self.log.message("Received read request. Current demand \(self.demand).", SSH_LOG_DEBUG)
self.inflightReadsLoop()
}

Expand All @@ -536,6 +540,8 @@ extension SFTPFile: BlinkFiles.Reader, BlinkFiles.WriterTo {
}
}

self.log.message("Scheduled reads \(inflightReads.count). Current demand \(self.demand).", SSH_LOG_DEBUG)

// Schedule more blocks to read. This way data will already be ready when we come back.
while isComplete == false && inflightReads.count < self.maxConcurrentOps {
let asyncRequest = sftp_async_read_begin(self.file, UInt32(self.blockSize))
Expand All @@ -545,7 +551,7 @@ extension SFTPFile: BlinkFiles.Reader, BlinkFiles.WriterTo {
}
inflightReads.append(UInt32(asyncRequest))
}

if let data = data, data.count > 0 {
pub.send(data)
// TODO Account for demand here
Expand All @@ -554,13 +560,16 @@ extension SFTPFile: BlinkFiles.Reader, BlinkFiles.WriterTo {
}
}

self.log.message("Next reads \(inflightReads.count). Current demand \(self.demand).", SSH_LOG_DEBUG)

if isComplete {
pub.send(completion: .finished)
return
}

// Enqueue again if there is still demand.
if self.demand != .none {
self.log.message("Enqueuing next read block", SSH_LOG_DEBUG)
rloop.schedule(after: .init(Date(timeIntervalSinceNow: 0.001))) {
self.inflightReadsLoop()
}
Expand Down Expand Up @@ -625,6 +634,11 @@ extension SFTPFile: BlinkFiles.Writer {
var written = wn
var isFinished = false

self.log.message("Scheduled writes \(inflightWrites.count).", SSH_LOG_DEBUG)

ssh_channel_set_blocking(self.channel, 1)
defer { ssh_channel_set_blocking(self.channel, 0) }

if inflightWrites.count > 0 {
// Check scheduled writes
do {
Expand All @@ -647,9 +661,6 @@ extension SFTPFile: BlinkFiles.Writer {
}
}

ssh_channel_set_blocking(self.channel, 1)
defer { ssh_channel_set_blocking(self.channel, 0) }

// Schedule more writes
while inflightWrites.count < self.maxConcurrentOps && write.count > 0 {
var asyncRequest: UInt32 = 0
Expand All @@ -673,6 +684,8 @@ extension SFTPFile: BlinkFiles.Writer {
write = write.subdata(in: length..<write.count)
}

self.log.message("New writes \(inflightWrites.count).", SSH_LOG_DEBUG)

if writtenBytes > 0 {
// Publish bytes written
pb.send(writtenBytes)
Expand All @@ -693,10 +706,13 @@ extension SFTPFile: BlinkFiles.Writer {

func checkWrites() throws -> Int {
var lastIdx = 0

for block in inflightWrites {
self.log.message("sftp_async_write_end sent", SSH_LOG_DEBUG)
let rc = sftp_async_write_end(self.file, block, 0)
self.log.message("sftp_async_write_end \(rc)", SSH_LOG_DEBUG)
if rc == SSH_AGAIN {
self.log.message("Write AGAIN", SSH_LOG_DEBUG)
break
} else if rc != SSH_OK {
throw FileError(title: "Error while writing block", in: session)
Expand Down
2 changes: 1 addition & 1 deletion SSHTests/AuthTests.swift
Expand Up @@ -173,7 +173,7 @@ class AuthTests: XCTestCase {
func testAgentPartialAuthentication() throws {
let agent = SSHAgent()
let key = try SSHKey(fromFileBlob: Credentials.privateKey.data(using: .utf8)!)
XCTAssertTrue(agent.loadKey(key, aka: "testKey"))
agent.loadKey(key, aka: "testKey")

let config = SSHClientConfig(
user: Credentials.partialAuthentication.user,
Expand Down

0 comments on commit 0c7ece2

Please sign in to comment.