Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement runner for apache spark #133

Open
moradology opened this issue Nov 15, 2023 · 7 comments
Open

Implement runner for apache spark #133

moradology opened this issue Nov 15, 2023 · 7 comments
Assignees

Comments

@moradology
Copy link
Contributor

The goal here is fairly straightforward and the introductory materials mostly live here: https://beam.apache.org/documentation/runners/spark/. They indicate, however, that there will likely be (at least?) two different spark runners to accommodate different spark cluster deployments. I'm going to try to capture what I've learned so far, as the devil is really in the (fairly obscure) details here and familiarity with spark isn't quite sufficient to make things legible in lieu of some heavy iteration.

Rationale for multiple spark runners

Roughly, spark clusters can run with in 'stand alone' mode with spark master listening on port 7077 (this is what the apache-beam documentation writers seem to greatly prefer) or as via spark-submit (which is how YARN and MESOS orchestrated clusters do their thing and so is probably valuable).

Beam jobserver route

In the case of the port 7077 job submissions, apache beam has a pretty convenient docker image which serves as the intermediary between a python process which describes/returns the beam pipeline and the cluster to which it is connected via port 7077 on the cluster's master. Here, any runner we'd implement probably needs little (nothing?) more than these two implementation specific parameters:
image

This isn't the way to work with EMR, as EMR is YARN backed. A different strategy is necessary.

Beam stuff-everything-into-a-jar-and-pray-your-workers-have-docker-permissions route

As stated, EMR orchestrates spark via YARN. Yarn wants to see spark-submit being used and doesn't care what happens on port 7077 (it isn't listening). That's fine. Actually, apache-beam has some tooling for this:
image
What happens when this is run is that a jar file pops out at the specified path. This jar file is exactly the kind of thing YARN wants to consume. EMR "steps" are totally capable of running a spark-submit pointing out to some location on S3 and, in fact, this is how things are designed.

There's a bit of a wrinkle here, unfortunately. As it turns out, regardless of the SDK one chooses (EXTERNAL, PROCESS, or - of course - DOCKER), beam jobs specified in python will absolutely, positively, 100% try to spawn docker containers to actually run the code. This is somewhat surprising, as the SDK docs here would seem to indicate that PROCESS ought to use a compiled boot binary. Well, I compiled that binary and included it in the EMR bootstrap step and it absolutely still attempts to run everything via docker (😢).

OK, so why is this a problem?

Well, as it turns out EMR does not grant the hadoop user appropriate permissions to spawn docker containers and there's not really an obvious workaround via AWS' standard customization path of providing a bootstrap script (short of hacking the expected permissions in the various files which the docker group grants access to which seems bad and kind of dangerous).
image
Having unravelled this mysterious bad behavior, I stumbled across a relevant issue which seems to be current:
https://issues.apache.org/jira/browse/BEAM-11378
apache/beam#20568

Here's a taste of the bootstrap script that I was attempting to modify permissions with. newgrp is a dead end in bootstrap and will cause your cluster to hang for potentially hours without ANY useful debugging information:
image

Is there any workaround?

Yeah, I think so though I've not proven them out yet.

  1. Bake a custom AMI that adjusts worker permissions so that bootstrap doesn't have to.
  2. Use a custom docker image for EMR and accept that we're going to run Docker in Docker (in Docker) 🤮
@moradology moradology self-assigned this Nov 15, 2023
@ranchodeluxe
Copy link
Collaborator

This is somewhat surprising, as the SDK docs here would seem to indicate that PROCESS ought to use a compiled boot binary. Well, I compiled that binary and included it in the EMR bootstrap step and it absolutely still attempts to run everything via docker (😢).

Can this be ticketed or asked somewhere? Sounds like a pretty incredible bug if so

@moradology
Copy link
Contributor Author

I'm thinking it might be appropriate to add some detail to the issue mentioned above that already deals with docker perms on the beam repo (apache/beam#20568).

@moradology
Copy link
Contributor Author

moradology commented Dec 4, 2023

OK, so @ranchodeluxe's pushback here was so sensible and my understanding of what on earth was going wrong so inchoate that I went back to double check use of the PROCESS SDK. To make a long story short: I believe he was right, that I was missing something, and that there's a path to EMR Serverless running spark jobs to back Beam pipelines.

Slightly longer story:
Bugged by the above comment, I went to Beam source built by gradle and attempted to make sense of if/when boot attempts to summon containers to execute workloads. There's quite a bit going on here, but it appears to be the case that boot is constructed as the entrypoint to docker for standard DOCKER SDK harness environments. This suggested that if the appropriate settings are available wherever boot is built and invoked, we should expect things to run.

The next matter is how to best approach environment construction. The most recent Beam release advertises Spark support up to 3.2.1 but I've spent a bit of time digging around and it appears that people have used it with later versions. 3.3.0, at least, seems fine (if this seems like it shouldn't matter, rest assured that - at least when dealing with spark - minor versions demand respect). The reason that this turns out to be useful is that the earliest EMR Serverless version that allows container customization is 6.9.0. EMR Serverless 6.9.0 is pinned to Spark 3.3.0.

OK, so this relatively minimal image is built and then pushed up to ECR:

FROM public.ecr.aws/emr-serverless/spark/emr-6.9.0:latest

USER root

# Install development tools and dependencies
RUN yum update -y && \
    yum install git golang -y && \
    yum clean all

RUN pip3 install 'urllib3<1.27,>=1.25.4' boto3 apache-beam[aws]

# Build the binary to boot beam processes
RUN cd /mnt && git clone --branch v2.51.0 --single-branch https://github.com/apache/beam.git
RUN cd /mnt/beam && ./gradlew :sdks:python:container:build 
RUN cd /mnt/beam && cp sdks/python/container/build/target/launcher/linux_amd64/boot /home/hadoop/boot
RUN rm -rf /mnt/beam

USER hadoop:hadoop

With that container in place, a few permissions need to be set (VPC settings for internet access but also a trust policy for EMR-Serverless principal's to access ECR data). Also, any special runtime permissions will need to be added to an execution role at this point - the default in the console may well be fine for accessing S3 on the owner's account. When constructing a serverless application, remember to refer to the image pushed up to ECR
image

OK, so with this infra in place, I wrote a very simple, very stupid test script that counts words in moby dick and which hopefully invokes some parallelism. Here's that:

import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

def run():
    options = PipelineOptions([
    ])
    with beam.Pipeline(options=options) as p:
        (p
         | 'ReadFromText' >> beam.io.ReadFromText('s3://nzimmerman-testing/texts/mobydick.txt')
         | 'SplitWords' >> beam.FlatMap(lambda x: x.split())
         | 'CountWords' >> beam.combiners.Count.PerElement()
         | 'FormatResults' >> beam.Map(lambda word_count: f"{word_count[0]}: {word_count[1]}")
         | 'WriteToText' >> beam.io.WriteToText('s3://nzimmerman-testing/beam/output2')
        )

if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    run()

From there, running the beam job is just a matter of referring to the boot binary, telling it the correct runner (not the portable runner!), and submitting the location (in this case on s3) of the beam pipeline (here's what that looks like in the console):
image

Oh, and here are some output files with counts of words in moby dick:
image

@cisaacstern
Copy link
Member

@moradology this is awesome, and also I love that you have your own copy of Moby Dick stored on s3 😃

@moradology
Copy link
Contributor Author

moradology commented Jan 25, 2024

Good news/bad news after further exploration. Bad first, I suppose:

Bad

I've come to the conclusion that EMR-Serverless is a non-starter. This absolutely sucks because serverless is just about the best option available for running a spark cluster with relatively little effort put into getting the infrastructure right. Spark just runs there and it generally does so faster and cheaper than a naively, manually provisioned cluster on EMR classic.

The Moby dick job I ran there was tiny and probably worked because it avoided serializing work over to executor nodes (👎). The process SDK-harness works and it is not too bad (just yum install git and golang) to build the boot binary in a custom image but the expectations baked into the SparkRunner are where things get ugly: Beam wants to spin up a job server to translate pipeline work for spark to use but to do this it expects to have access to the scheduler. However, unlike other contexts where spark runs, the spark master is not exposed on 7077 for job submission. Environment variable that are characteristic of the issue:
image
Digging further, the spark-defaults contained within EMR-serverless docker images lists spark-master as: custom:emr-serverless. It is possible that there's something which could be done inside the beam job runner itself that would allow it to work nicely with this custom scheduler (yarn and mesos both work, for instance) but the inner workings of the emr-serverless spark scheduler are basically a mystery. The downside of any such approach, however, is that pangeo-forge-runner would need to be kicked off on the master node by some kind of shell script, as there's no spark master IP/port when things aren't running (as is the default in this serverless context). This is doable; the python that you'd submit would look something like this thing I used in testing:

import boto3

import argparse
import subprocess


def fetch_config_from_s3(s3_uri, local_path):
    """Fetch a file from S3 and save it locally."""
    bucket, key = parse_s3_uri(s3_uri)
    s3 = boto3.client('s3')
    s3.download_file(Bucket=bucket, Key=key, Filename=local_path)

def parse_s3_uri(s3_uri):
    """Parse an s3:// URI into bucket name and key."""
    if not s3_uri.startswith("s3://"):
        raise ValueError("Invalid S3 URI")
    parts = s3_uri[5:].split("/", 1)
    if len(parts) != 2:
        raise ValueError("Invalid S3 URI")
    return parts[0], parts[1]

def main():
    parser = argparse.ArgumentParser(description="Run pangeo forge aimed at process harness to support EMR")
    parser.add_argument('--s3_config_uri', required=True, help="S3 URI for the config file")
    args, unknown_args = parser.parse_known_args()

    local_config_path = '/tmp/pangeo_config.py'
    fetch_config_from_s3(args.s3_config_uri, local_config_path)

    # Construct the CLI command with the remaining arguments
    command = ["pangeo-forge-runner", "bake"] + unknown_args + ["-f", local_config_path]
    subprocess.run(command, check=True)

if __name__ == "__main__":
    main()

Good

As for good news:

  1. In the process of trying to figure out what can be done with serverless, I wrote some infra code and customized a docker image for EMR-serverless (with GDAL built in). Perhaps that will someday be useful if the difficulty above can find resolution: https://github.com/moradology/bEaMR-Serverless (forgive the corny name, especially given that it doesn't actually run beam reliably)

  2. EMR classic runs on yarn so it is much closer to the anticipated SparkRunner scenario. I avoided going down this path because I wasn't familiar with the difficulties already mentioned and because EMR classic is expensive, heavy, and requires quite a bit of configuration. These things make the convenience of pangeo-forge-runner a bit less obvious. Still, there is plausibly a path forward here, should that still be desirable. Here's a bootstrap script that would be suitable for at least getting started with running beam jobs via the process sdk-harness:

#!/bin/bash
set -ex

# Update and install necessary packages
sudo yum update -y
sudo yum install -y git golang

# Clone the specific branch of Apache Beam
git clone --branch v2.51.0 --single-branch https://github.com/apache/beam.git

# Build the Beam Python SDK container
cd beam
./gradlew :sdks:python:container:build

# Copy the boot binary to the Hadoop home
cp sdks/python/container/build/target/launcher/linux_amd64/boot /home/hadoop/boot
sudo chmod +x /home/hadoop/boot

# Clean up
cd ..
rm -rf beam

@cisaacstern
Copy link
Member

This is super helpful and interesting work, thanks so much for the summary @moradology. And I like the repo name!

@moradology
Copy link
Contributor Author

moradology commented Feb 22, 2024

OK, so I read a bunch of Beam source and I'm starting to think that there is a path to running pipelines on EMR-Serverless (which is kind of the holy grail for doing this kind of work on AWS). It will require providing a new (non-portable, non-job-server-dependent) implementation of beam's runner class rather than using any of the off-the-shelf solutions they provide. A sketch of the relevant lines of work:

  1. First, take a look at this example pyspark job's initialization (which works on emr-serverless): https://github.com/aws-samples/emr-serverless-samples/blob/main/examples/pyspark/extreme_weather.py#L48
    Compare it to the DaskRunner initialization which mercifully subclasses a local runner implementation: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dask/dask_runner.py#L172-L181
    They're very similar. The actual initialization of the job is 100% compatible if we follow a similar path.

  2. The next question relates to AST transformation. That, too, is far less involved than one might suppose. There really aren't that many translations necessary to get basic batch pipelines off the ground. It appears to literally just be these guys to achieve parity with the DaskRunner:

TRANSLATIONS = {
    _Create: Create,
    apache_beam.ParDo: ParDo,
    apache_beam.Map: Map,
    _GroupByKeyOnly: GroupByKey,
    _Flatten: Flatten,
}

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/dask/transform_evaluator.py

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants