Skip to content

Commit

Permalink
Merge pull request #8 from karthikbhamidipati/feat/add_support_for_aw…
Browse files Browse the repository at this point in the history
…s_glue

Added support for AWS Glue
  • Loading branch information
karthikbhamidipati committed Mar 26, 2024
2 parents d2a2299 + 24ac82e commit 54c8558
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 20 deletions.
29 changes: 25 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,25 @@ See [CONTRIBUTING](CONTRIBUTING.md#security-issue-notifications) for more inform

## Prerequisites

- Your app is built and run against Spark 3.x

### Spark on EMR

- To leverage any Spark plugin, your EMR cluster needs to be run on release 6.x or newer, and `spark.plugins` needs to be specified when a Spark job is submitted.

- Your app is built and run against Spark 3.x
### Spark on AWS Glue

- To leverage any Spark plugin, you should be on AWS Glue 3 or Newer, and `spark.plugins` needs to be specified when Glue job is submitted.

## Onboarding Steps

1. Create a profiling group in CodeGuru Profiler and grant permission to your EMR EC2 role so that profiler agents can emit metrics to CodeGuru. Detailed instructions can be found [here](https://docs.aws.amazon.com/codeguru/latest/profiler-ug/setting-up-long.html).
- Create a profiling group in CodeGuru Profiler and grant permission to your EMR EC2 role or AWS Glue Job role so that profiler agents can emit metrics to CodeGuru. Detailed instructions can be found [here](https://docs.aws.amazon.com/codeguru/latest/profiler-ug/setting-up-long.html).

![](resources/images/profiling-group.gif)

2. Reference `codeguru-profiler-for-spark` via `--packages` (or `--jars`) when submitting your Spark job, along with `PROFILING_CONTEXT` and `ENABLE_AMAZON_PROFILER` defined. Below is an example where the profling group created in the previous step is assumed to be `CodeGuru-Spark-Demo`.
### Spark on EMR

- Reference `codeguru-profiler-for-spark` via `--packages` (or `--jars`) when submitting your Spark job, along with `PROFILING_CONTEXT` and `ENABLE_AMAZON_PROFILER` defined. Below is an example where the profling group created in the previous step is assumed to be `CodeGuru-Spark-Demo`.

```
spark-submit \
Expand All @@ -41,7 +49,7 @@ spark-submit \
<the-s3-object-key-of-your-spark-app-jar>
```

An alternative way to specify `PROFILING_CONTEXT` and `ENABLE_AMAZON_PROFILER` is via the AWS EMR web console. Go to the Configurations tab of your EMR cluster and configure both environment variables under the `yarn-env.export` classification for instance groups. Please note that `PROFILING_CONTEXT`, if configured in the web console, needs to escape all the commas on top of what's for the above spark-submit command.
- An alternative way to specify `PROFILING_CONTEXT` and `ENABLE_AMAZON_PROFILER` is via the AWS EMR web console. Go to the Configurations tab of your EMR cluster and configure both environment variables under the `yarn-env.export` classification for instance groups. Please note that `PROFILING_CONTEXT`, if configured in the web console, needs to escape all the commas on top of what's for the above spark-submit command.
```json
[{
"classification": "yarn-env",
Expand All @@ -57,8 +65,21 @@ An alternative way to specify `PROFILING_CONTEXT` and `ENABLE_AMAZON_PROFILER` i
}]
```

### Spark on AWS Glue

- Upload `codeguru-profiler-for-apache-spark.jar` to S3 and add the jar s3 path through `--extra-jars` parameter when using AWS Glue API. More details on AWS Glue API can be found [here](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html#w6aac28c11b8c11).
- Then, you need to specify the `PROFILING_CONTEXT` and `ENABLE_AMAZON_PROFILER` properties through `--conf` parameter when using AWS Glue API. A Sample value for `--conf` parameter would look like below:
`spark.plugins=software.amazon.profiler.AmazonProfilerPlugin --conf spark.executorEnv.ENABLE_AMAZON_PROFILER=true --conf spark.executorEnv.PROFILING_CONTEXT={"profilingGroupName":"CodeGuru-Spark-Demo"} --conf spark.yarn.appMasterEnv.ENABLE_AMAZON_PROFILER=true --conf spark.yarn.appMasterEnv.PROFILING_CONTEXT={"profilingGroupName":"CodeGuru-Spark-Demo", "driverEnabled": "true"}`.

***Note:*** AWS Glue doesn't support passing multiple `--conf` parameters, so when you're passing more than one `--conf` parameters such as `--conf k1=v1 --conf k2=v2`, The key and value for Glue API would look like below:
*Key:* `--conf`
*Value:* `k1=v1 --conf k2=v2`


## Troubleshooting Tips

### Spark on EMR

If profiling results do not show up in the CodeGuru web console of your AWS account, you can fire off a Spark shell from the master node of your EMR cluster and then check if your environment variables are correctly set up. For example,

```
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>software.amazon.profiler</groupId>
<artifactId>codeguru-profiler-for-spark</artifactId>
<version>1.1</version>
<version>1.2</version>
<packaging>jar</packaging>

<name>${project.groupId}:${project.artifactId}</name>
Expand Down Expand Up @@ -48,7 +48,7 @@
<scala-binary.version>2.12</scala-binary.version>
<scala.version>${scala-binary.version}.8</scala.version>
<spark.version>3.4.0</spark.version>
<profiler.version>1.2.1</profiler.version>
<profiler.version>1.2.2</profiler.version>
<slf4j.version>1.7.32</slf4j.version>
<lombok.version>1.18.22</lombok.version>
<junit.version>5.8.1</junit.version>
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/software/amazon/profiler/BasePlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.SparkConf;
import software.amazon.codeguruprofilerjavaagent.Profiler;
import software.amazon.profiler.utils.SparkConfUtils;

/**
* A base class interacts with AWS CodeGuru to start and stop profiling.
Expand Down Expand Up @@ -72,10 +74,10 @@ public Profiler createProfiler(String profilingGroupName, boolean heapSummaryEna
.build();
}

public ProfilingContext getContext() throws IOException {
if ("true".equals(System.getenv("ENABLE_AMAZON_PROFILER"))) {
public ProfilingContext getContext(SparkConf conf, boolean isDriver) throws IOException {
if ("true".equals(SparkConfUtils.getValueFromEnvOrSparkConf(conf, isDriver, "ENABLE_AMAZON_PROFILER"))) {
log.info("Profiling is enabled");
String json = System.getenv("PROFILING_CONTEXT");
String json = SparkConfUtils.getValueFromEnvOrSparkConf(conf, isDriver, "PROFILING_CONTEXT");
if (json != null) {
return new ObjectMapper().readValue(json, ProfilingContext.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class SparkDriverPlugin extends BasePlugin implements DriverPlugin {
public Map<String, String> init(SparkContext sc, PluginContext pluginContext) {
SERVICE.submit(() -> {
try {
ProfilingContext context = getContext();
ProfilingContext context = getContext(pluginContext.conf(), true);
if (context != null && context.isDriverEnabled()) {
log.info("Profiling context: " + context);
startProfiler(context.getProfilingGroupName(), context.isHeapSummaryEnabled(), 1.00);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class SparkExecutorPlugin extends BasePlugin implements ExecutorPlugin {
public void init(PluginContext ctx, Map<String, String> extraConf) {
SERVICE.submit(() -> {
try {
ProfilingContext context = getContext();
ProfilingContext context = getContext(ctx.conf(), false);
if (context != null && context.isExecutorEnabled()) {
log.info("Profiling context: " + context);
startProfiler(context.getProfilingGroupName(), context.isHeapSummaryEnabled(), context.getProbability());
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/software/amazon/profiler/utils/SparkConfUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package software.amazon.profiler.utils;

import org.apache.spark.SparkConf;

public class SparkConfUtils {
public static String getValueFromEnvOrSparkConf(SparkConf conf, boolean isDriver, String propertyName) {
String result = System.getenv(propertyName);
if (result != null) {
return result;
}
String prefix = isDriver ? "spark.yarn.appMasterEnv" : "spark.executorEnv";
return conf.get(String.format("%s.%s", prefix, propertyName), null);
}
}
118 changes: 109 additions & 9 deletions src/test/java/software/amazon/profiler/BasePluginTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package software.amazon.profiler;

import org.apache.spark.SparkConf;
import software.amazon.codeguruprofilerjavaagent.Profiler;

import com.github.stefanbirkner.systemlambda.SystemLambda;
Expand Down Expand Up @@ -81,22 +82,22 @@ public void testCreateProfiler() {
}

@Test
public void testGetContextWithoutEnvDefined() throws Exception {
Assertions.assertNull(new BasePlugin().getContext());
public void testGetContextWithoutEnvOrSparkConfDefined() throws Exception {
Assertions.assertNull(new BasePlugin().getContext(new SparkConf(), false));
}

@Test
public void testGetContextWithFirstEnvDefined() throws Exception {
ProfilingContext context = SystemLambda.withEnvironmentVariable("ENABLE_AMAZON_PROFILER", "true")
.execute(() -> new BasePlugin().getContext());
.execute(() -> new BasePlugin().getContext(new SparkConf(), false));
Assertions.assertNull(context);
}

@Test
public void testGetContext() throws Exception {
public void testGetContextWithEnvDefined() throws Exception {
ProfilingContext context = SystemLambda.withEnvironmentVariable("ENABLE_AMAZON_PROFILER", "true")
.and("PROFILING_CONTEXT", "{\"profilingGroupName\":\"Sample-Spark-App-Beta\"}")
.execute(() -> new BasePlugin().getContext());
.execute(() -> new BasePlugin().getContext(new SparkConf(), false));

Assertions.assertEquals("Sample-Spark-App-Beta", context.getProfilingGroupName());
Assertions.assertFalse(context.isDriverEnabled());
Expand All @@ -105,10 +106,10 @@ public void testGetContext() throws Exception {
}

@Test
public void testGetContextWithAllFlagsEnabled() throws Exception {
public void testGetContextWithAllFlagsEnabledInEnv() throws Exception {
ProfilingContext context = SystemLambda.withEnvironmentVariable("ENABLE_AMAZON_PROFILER", "true")
.and("PROFILING_CONTEXT", "{\"profilingGroupName\":\"Sample-Spark-App-Beta\",\"driverEnabled\":\"true\"}")
.execute(() -> new BasePlugin().getContext());
.execute(() -> new BasePlugin().getContext(new SparkConf(), false));

Assertions.assertEquals("Sample-Spark-App-Beta", context.getProfilingGroupName());
Assertions.assertTrue(context.isDriverEnabled());
Expand All @@ -117,15 +118,114 @@ public void testGetContextWithAllFlagsEnabled() throws Exception {
}

@Test
public void testGetContextWithAllFlagsDisabled() throws Exception {
public void testGetContextWithAllFlagsDisabledInEnv() throws Exception {
ProfilingContext context = SystemLambda.withEnvironmentVariable("ENABLE_AMAZON_PROFILER", "true")
.and("PROFILING_CONTEXT", "{\"profilingGroupName\":\"Sample-Spark-App-Gamma\",\"executorEnabled\":\"false\",\"heapSummaryEnabled\":\"false\"}")
.execute(() -> new BasePlugin().getContext());
.execute(() -> new BasePlugin().getContext(new SparkConf(), false));

Assertions.assertEquals("Sample-Spark-App-Gamma", context.getProfilingGroupName());
Assertions.assertFalse(context.isDriverEnabled());
Assertions.assertFalse(context.isExecutorEnabled());
Assertions.assertFalse(context.isHeapSummaryEnabled());
}

@Test
public void testGetContextWithFirstSparkConfDefinedInExecutor() throws Exception {
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.executorEnv.ENABLE_AMAZON_PROFILER", "true");
ProfilingContext context = new BasePlugin().getContext(sparkConf, false);
Assertions.assertNull(context);
}

@Test
public void testGetContextWithBothSparkConfDefinedInExecutor() throws Exception {
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.executorEnv.ENABLE_AMAZON_PROFILER", "true");
sparkConf.set("spark.executorEnv.PROFILING_CONTEXT", "{\"profilingGroupName\":\"Sample-Spark-App-Beta\"}");

ProfilingContext context = new BasePlugin().getContext(sparkConf, false);

Assertions.assertEquals("Sample-Spark-App-Beta", context.getProfilingGroupName());
Assertions.assertFalse(context.isDriverEnabled());
Assertions.assertTrue(context.isExecutorEnabled());
Assertions.assertTrue(context.isHeapSummaryEnabled());
}

@Test
public void testGetContextWithAllFlagsEnabledInSparkConfForExecutor() throws Exception {
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.executorEnv.ENABLE_AMAZON_PROFILER", "true");
sparkConf.set("spark.executorEnv.PROFILING_CONTEXT", "{\"profilingGroupName\":\"Sample-Spark-App-Beta\",\"driverEnabled\":\"true\"}");

ProfilingContext context = new BasePlugin().getContext(sparkConf, false);

Assertions.assertEquals("Sample-Spark-App-Beta", context.getProfilingGroupName());
Assertions.assertTrue(context.isDriverEnabled());
Assertions.assertTrue(context.isExecutorEnabled());
Assertions.assertTrue(context.isHeapSummaryEnabled());
}

@Test
public void testGetContextWithAllFlagsDisabledInSparkConfForExecutor() throws Exception {
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.executorEnv.ENABLE_AMAZON_PROFILER", "true");
sparkConf.set("spark.executorEnv.PROFILING_CONTEXT", "{\"profilingGroupName\":\"Sample-Spark-App-Gamma\",\"executorEnabled\":\"false\",\"heapSummaryEnabled\":\"false\"}");

ProfilingContext context = new BasePlugin().getContext(sparkConf, false);

Assertions.assertEquals("Sample-Spark-App-Gamma", context.getProfilingGroupName());
Assertions.assertFalse(context.isDriverEnabled());
Assertions.assertFalse(context.isExecutorEnabled());
Assertions.assertFalse(context.isHeapSummaryEnabled());
}

@Test
public void testGetContextWithFirstSparkConfDefinedInDriver() throws Exception {
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.yarn.appMasterEnv.ENABLE_AMAZON_PROFILER", "true");
ProfilingContext context = new BasePlugin().getContext(sparkConf, true);
Assertions.assertNull(context);
}

@Test
public void testGetContextWithBothSparkConfDefinedInDriver() throws Exception {
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.yarn.appMasterEnv.ENABLE_AMAZON_PROFILER", "true");
sparkConf.set("spark.yarn.appMasterEnv.PROFILING_CONTEXT", "{\"profilingGroupName\":\"Sample-Spark-App-Beta\"}");

ProfilingContext context = new BasePlugin().getContext(sparkConf, true);

Assertions.assertEquals("Sample-Spark-App-Beta", context.getProfilingGroupName());
Assertions.assertFalse(context.isDriverEnabled());
Assertions.assertTrue(context.isExecutorEnabled());
Assertions.assertTrue(context.isHeapSummaryEnabled());
}

@Test
public void testGetContextWithAllFlagsEnabledInSparkConfForDriver() throws Exception {
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.yarn.appMasterEnv.ENABLE_AMAZON_PROFILER", "true");
sparkConf.set("spark.yarn.appMasterEnv.PROFILING_CONTEXT", "{\"profilingGroupName\":\"Sample-Spark-App-Beta\",\"driverEnabled\":\"true\"}");

ProfilingContext context = new BasePlugin().getContext(sparkConf, true);

Assertions.assertEquals("Sample-Spark-App-Beta", context.getProfilingGroupName());
Assertions.assertTrue(context.isDriverEnabled());
Assertions.assertTrue(context.isExecutorEnabled());
Assertions.assertTrue(context.isHeapSummaryEnabled());
}

@Test
public void testGetContextWithAllFlagsDisabledInSparkConfForDriver() throws Exception {
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.yarn.appMasterEnv.ENABLE_AMAZON_PROFILER", "true");
sparkConf.set("spark.yarn.appMasterEnv.PROFILING_CONTEXT", "{\"profilingGroupName\":\"Sample-Spark-App-Gamma\",\"executorEnabled\":\"false\",\"heapSummaryEnabled\":\"false\"}");

ProfilingContext context = new BasePlugin().getContext(sparkConf, true);

Assertions.assertEquals("Sample-Spark-App-Gamma", context.getProfilingGroupName());
Assertions.assertFalse(context.isDriverEnabled());
Assertions.assertFalse(context.isExecutorEnabled());
Assertions.assertFalse(context.isHeapSummaryEnabled());
}
}

0 comments on commit 54c8558

Please sign in to comment.