Skip to content

Commit

Permalink
Merge pull request #464 from TerraNibble/gh-427
Browse files Browse the repository at this point in the history
GH-427: Read initial ACK on channel open prior to direct stream upload & close streams prior to exit code handling
  • Loading branch information
tomaswolf committed Apr 27, 2024
2 parents f8a0aec + 61576a5 commit e754db9
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 17 deletions.
Expand Up @@ -112,13 +112,17 @@ public void upload(
String cmd = ScpClient.createSendCommand(remote, options);
ClientSession session = getClientSession();
ChannelExec channel = openCommandChannel(session, cmd);
try (InputStream invOut = channel.getInvertedOut();
OutputStream invIn = channel.getInvertedIn()) {
// NOTE: we use a mock file system since we expect no invocations for it
ScpHelper helper = new ScpHelper(session, invOut, invIn, new MockFileSystem(remote), opener, listener);
Path mockPath = new MockPath(remote);
helper.sendStream(new DefaultScpStreamResolver(name, mockPath, perms, time, size, local, cmd),
options.contains(Option.PreserveAttributes), ScpHelper.DEFAULT_SEND_BUFFER_SIZE);
try {
try (InputStream invOut = channel.getInvertedOut();
OutputStream invIn = channel.getInvertedIn()) {
// NOTE: we use a mock file system since we expect no invocations for it
ScpHelper helper = new ScpHelper(session, invOut, invIn, new MockFileSystem(remote), opener, listener);
Path mockPath = new MockPath(remote);
DefaultScpStreamResolver resolver = new DefaultScpStreamResolver(name, mockPath, perms, time, size, local, cmd);
helper.readAndValidateOperationAck(cmd, resolver);
helper.sendStream(resolver, options.contains(Option.PreserveAttributes),
ScpHelper.DEFAULT_SEND_BUFFER_SIZE);
}
handleCommandExitStatus(cmd, channel);
} finally {
channel.close(false);
Expand Down
21 changes: 11 additions & 10 deletions sshd-scp/src/main/java/org/apache/sshd/scp/common/ScpHelper.java
Expand Up @@ -411,12 +411,8 @@ public String readLine(boolean canEof) throws IOException {
}

public void send(Collection<String> paths, boolean recursive, boolean preserve, int bufferSize) throws IOException {
ScpAckInfo ackInfo = readAck(false);
boolean debugEnabled = log.isDebugEnabled();
if (debugEnabled) {
log.debug("send({}) ACK={}", paths, ackInfo);
}
validateOperationReadyCode("send", "Paths", ackInfo);
readAndValidateOperationAck("send", "Paths");

LinkOption[] options = IoUtils.getLinkOptions(true);
for (String pattern : paths) {
Expand Down Expand Up @@ -464,11 +460,7 @@ public void send(Collection<String> paths, boolean recursive, boolean preserve,

public void sendPaths(Collection<? extends Path> paths, boolean recursive, boolean preserve, int bufferSize)
throws IOException {
ScpAckInfo ackInfo = readAck(false);
if (log.isDebugEnabled()) {
log.debug("sendPaths({}) ACK={}", paths, ackInfo);
}
validateOperationReadyCode("sendPaths", "Paths", ackInfo);
readAndValidateOperationAck("sendPaths", "Paths");

LinkOption[] options = IoUtils.getLinkOptions(true);
for (Path file : paths) {
Expand Down Expand Up @@ -750,6 +742,15 @@ public ScpAckInfo readAck(boolean canEof) throws IOException {
return ScpAckInfo.readAck(in, csIn, canEof);
}

public void readAndValidateOperationAck(String cmd, Object location) throws IOException {
ScpAckInfo ackInfo = readAck(false);
boolean debugEnabled = log.isDebugEnabled();
if (debugEnabled) {
log.debug("readAndValidateOperationAck({}) ACK={}", location, ackInfo);
}
validateOperationReadyCode(cmd, location, ackInfo);
}

@Override
public String toString() {
return getClass().getSimpleName() + "[" + getSession() + "]";
Expand Down

0 comments on commit e754db9

Please sign in to comment.