Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QUESTION] - Potential timeout? Advice on correct usage. #71

Open
moranbw opened this issue Jan 22, 2024 · 0 comments
Open

[QUESTION] - Potential timeout? Advice on correct usage. #71

moranbw opened this issue Jan 22, 2024 · 0 comments
Assignees
Labels
question Further information is requested

Comments

@moranbw
Copy link

moranbw commented Jan 22, 2024

Hello, thanks for the great crate!

I am hoping you can help me with a strange issue some of my users are experiencing. I am essentially just looping through a list of files to download. Some users have reported that the download will begin, but eventually will stall out and not continue. In the attached logs, you can see the downloads start, but the user reported waiting for about 15 minutes with no response (the files are not large, and it is all on a LAN, so should not be network issue). I am doing so in batches of 10 via tokio::sync::Semaphore, but even when I was just looping through one by one, this issue seemed to pop up here and there.

Am I doing anything wrong in my code? At one point I was using the standard ftp_stream.retr_as_buffer method, but I switched to ftp_stream.retr_as_stream to see if that would help, but it does not seem to have made a difference. As you can tell from the logs, I never make it out of ap_ftp_copy by Ok or Err.

Any advice is welcome!

Code

//////////////////////////////////////////////////////////////////
        info!("Copying POS from AP...");
        progress_bar.set_message(format!("{} (Copying POS from AP...)", date_string.clone()));
        let pos_ap_path = Path::new(&parent_path).join("pos_ap");

        let permits = Arc::new(Semaphore::new(10));
        let futures = date.pos.ap.clone().into_iter().map(|pos| {
            let permits = Arc::clone(&permits);
            let pos_ap_path_clone = pos_ap_path.clone();
            let pos_clone: String = pos.clone();
            async move {
                let permit = match permits.acquire().await {
                    Ok(permit) => permit,
                    Err(e) => {
                        return Err(CommandError {
                            message: format!(
                                "Could not acquire permit to copy {}",
                                pos_clone.clone()
                            ),
                            cause: e.to_string(),
                        })
                    }
                };
                info!("[pos_ap] Copying {}", pos_clone);
                match ap_ftp_copy(pos_clone.as_str(), &pos_ap_path_clone).await {
                    Ok(message) => {                
                        info!("[pos_ap] Successfully copied {}", pos_clone);
                        drop(permit);
                        progress_bar.inc(1);
                        Ok(message)
                    }
                    Err(e) => return Err(e),
                }
            }
        });

        let results = future::join_all(futures).await;
        for result in results {
            match result {
                Ok(message) => debug!("{}", message),
                Err(e) => return Err(e),
            }
        }
//////////////////////////////////////////////////////////////////

pub async fn ap_ftp_copy(file: &str, dest: &PathBuf) -> Result<String, CommandError> {
    if async_std::fs::metadata(&dest).await.is_err() {
        debug!(" mkdir: {:?}", dest);
        match async_std::fs::create_dir_all(&dest).await {
            Ok(()) => (),
            Err(e) => {
                return Err(CommandError {
                    message: format!("Problem creating dirs for {:?}", dest),
                    cause: e.to_string(),
                })
            }
        };
    }

    let mut new_file = match async_std::fs::OpenOptions::new()
        .write(true)
        .create(true)
        .truncate(true)
        .open(dest.join(file))
        .await
    {
        Ok(new_file) => new_file,
        Err(e) => {
            return Err(CommandError {
                message: format!("Problem creating file for {}", file),
                cause: e.to_string(),
            })
        }
    };

    let mut ftp_stream = match ap_ftp::connect_async().await {
        Ok(ftp_stream_async) => ftp_stream_async,
        Err(e) => return Err(e),
    };

    let mut file_stream = match ftp_stream.retr_as_stream(file).await {
        Ok(file_stream) => file_stream,
        Err(e) => {
            return Err(CommandError {
                message: format!("Problem creating stream for file {}", file),
                cause: e.to_string(),
            })
        }
    };

    let copy_response = match async_std::io::copy(&mut file_stream, &mut new_file).await {
        Ok(_bytes) => {
            format!("Successfully copied file: {}", file)
        }
        Err(e) => {
            return Err(CommandError {
                message: format!("Problem writing file {} from stream", file),
                cause: e.to_string(),
            })
        }
    };

    match ftp_stream.finalize_retr_stream(file_stream).await {
        Ok(()) => (),
        Err(e) => {
            return Err(CommandError {
                message: format!("Problem finalizing stream for file {}", file),
                cause: e.to_string(),
            })
        }
    }
        
    match ftp_stream.quit().await {
        Ok(()) => Ok(copy_response),
        Err(e) => {
            warn!("Could not close FTP session in discover: {}", e.to_string());
            return Ok(copy_response);
        }
    }
}

Log

ap_ftp.log

@moranbw moranbw added the question Further information is requested label Jan 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants