Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] retry get spark state from spark client log when yarn queue expire #45695

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

blanklin030
Copy link
Contributor

@blanklin030 blanklin030 commented May 15, 2024

Why I'm doing:

Yarn maintains two lists: the running tasks in list 1 and the finished tasks in list 2. To avoid excessive memory usage, 5000 lists are configured for the number of completed applications.
This will cause a boundary problem. After yarn clears the appid, the asynchronous thread (load etl checker) checks the APPID after 5s. This will cause the boundary problem

What I'm doing:

After the spark task is submitted, the spark interaction log is maintained on the client. As long as the log is persisted to the disk, when the boundary problem occurs in the future, the spark log is parsed again to know the actual execution status of the task

Fixes #45694

What type of PR is this:

  • BugFix
  • Feature
  • Enhancement
  • Refactor
  • UT
  • Doc
  • Tool

Does this PR entail a change in behavior?

  • Yes, this PR will result in a change in behavior.
  • No, this PR will not result in a change in behavior.

If yes, please specify the type of change:

  • Interface/UI changes: syntax, type conversion, expression evaluation, display information
  • Parameter changes: default values, similar parameters but with different default values
  • Policy changes: use new policy to replace old one, functionality automatically enabled
  • Feature removed
  • Miscellaneous: upgrade & downgrade compatibility, etc.

Checklist:

  • I have added test cases for my bug fix or my new feature
  • This pr needs user documentation (for new or modified features or behaviors)
    • I have added documentation for my new feature or new function
  • This is a backport pr

Bugfix cherry-pick branch check:

  • I have checked the version labels which the pr will be auto-backported to the target branch
    • 3.3
    • 3.2
    • 3.1
    • 3.0
    • 2.5

@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.


BlankLin seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

}
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:

YarnApplicationState.valueOf(state) may throw IllegalArgumentException if the input state does not correspond to any YarnApplicationState enum value

You can modify the code like this:

if (line.contains(STATE)) {
    // 1. state
    String state = SparkClientLogHelper.regexGetState(line);
    if (state != null) {
        try {
            YarnApplicationState yarnState = YarnApplicationState.valueOf(state);
            newState = SparkClientLogHelper.fromYarnState(yarnState);
            if (newState != oldState) {
                sparkLoadAppHandle.setState(newState);
            }
        } catch (IllegalArgumentException e) {
            LOG.warn("Invalid YarnApplicationState: {}", state);
        }
    }
    // 2. appId
    String appId = SparkClientLogHelper.regexGetAppId(line);
    if (appId != null && !appId.equals(sparkLoadAppHandle.getAppId())) {
        sparkLoadAppHandle.setAppId(appId);
    }
}

This adjustment includes a try-catch block around YarnApplicationState.valueOf(state) to gracefully handle cases where the state from the log doesn't exactly match any YarnApplicationState enums, avoiding a potential IllegalArgumentException that could disrupt the flow or crash the application.

result = appIdMatcher.group();
}
return result;
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:
Implicit resource leak due to not closing BufferedReader and potentially "rewrite spark client log fail." without handling or logging

You can modify the code like this:

@@ -125,7 +124,12 @@ public void run() {
             try {
+                // Initialize BufferedReader here to ensure it's closed properly in the finally block
                 outReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+            } catch (Exception e) {
+                LOG.warn("Error initializing BufferedReader.", e);
+            }
+
             try {
                 while (!isStop && (line = outReader.readLine()) != null) {
@@ -187,6 +191,11 @@ public void run() {
                             }
                         }
                     }
+                    SparkClientLogHelper.readLine4OtherValues(line, handle);
+                }
+            } finally {
+                if (outReader != null) {
+                    outReader.close();
                 }
             } catch (Exception e) {

In this modification, I've ensured that the BufferedReader, outReader, is declared outside the initial try block so it can be closed properly in the finally block. This prevents a potential resource leak. Additionally, I removed the unnecessary stream closure for outputStream as it seems it has been removed from the provided context and might have been part of an earlier version of the code.

I also ensured there is proper logging if initializing the BufferedReader fails. Handling resources accurately is crucial to prevent memory leaks or locked files, which can lead to application failure or degraded performance over time.

@@ -285,8 +312,7 @@ public EtlStatus getEtlJobStatus(SparkLoadAppHandle handle, String appId, long l
status.setFailMsg("yarn app state: " + state.toString());
}
}
status.setTrackingUrl(handle.getUrl() != null ? handle.getUrl() : report.getTrackingUrl());
status.setProgress((int) (report.getProgress() * 100));
status.setTrackingUrl(handle.getUrl() != null ? handle.getUrl() : trackingUrl);
} else {
// state from handle
if (handle == null) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:
Inconsistent handling of log file paths and potential confusion regarding where the logs should be written or read from, especially related to the setRedirectLogPath(logFilePath) removal without ensuring an alternative method securely handles log path setting.

You can modify the code like this:

@@ -124,6 +124,7 @@
         String jobStageHdfsPath = resource.getWorkingDir();
         // spark launcher log path
         String logFilePath = Config.spark_launcher_log_dir + "/" + String.format(LAUNCHER_LOG, loadJobId, loadLabel);
+        handle.setLogPath(logFilePath); // Ensure that the log path is being set correctly here
 
@@ -175,7 +176,6 @@
             if (!FeConstants.runningUnitTest) {
                 SparkLauncherMonitor.LogMonitor logMonitor = SparkLauncherMonitor.createLogMonitor(handle);
                 logMonitor.setSubmitTimeoutMs(sparkLoadSubmitTimeout);
-                // logMonitor.setRedirectLogPath(logFilePath); Incorrectly removed, reconsider or ensure alternate method
+                logMonitor.setLogPath(logFilePath); // Correct approach: Ensure redirection or similar functionality is encapsulated within setLogPath or another adequately named method.
                 logMonitor.start();
                 try {
                     logMonitor.join();

This correction aims to provide a clearer and more consistent approach to how log paths are handled within the system. It's important when refactoring or modifying code to keep track of how essential elements like logging are managed throughout the application to prevent errors or oversights that can lead to mismanagement of logs or loss of critical debugging information.

…expir

Signed-off-by: BlankLin <luck.linxiaoyu@gmail.com>
@blanklin030 blanklin030 force-pushed the fix-get-spark-state-when-yarn-expire branch from 5d54cc2 to 490e2a7 Compare May 20, 2024 15:55
Copy link

[FE Incremental Coverage Report]

fail : 23 / 37 (62.16%)

file detail

path covered_line new_line coverage not_covered_line_detail
🔵 com/starrocks/load/loadv2/SparkEtlJobHandler.java 15 27 55.56% [266, 267, 285, 292, 293, 294, 295, 298, 299, 300, 301, 303]
🔵 com/starrocks/load/loadv2/SparkLauncherMonitor.java 8 10 80.00% [167, 168]

Copy link

[BE Incremental Coverage Report]

pass : 0 / 0 (0%)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BugFix] retry get spark state from spark client log when yarn queue expire
2 participants