Skip to content

Apache Beam Jobs

Sean Lip edited this page Apr 2, 2024 · 48 revisions

Table of contents

Introduction

Apache Beam is used by Oppia to perform large-scale datastore operations. We use Apache Beam jobs mostly as batch operation jobs, examples can be:

  • Count the number of models in the datastore.
  • Update a property across all models.
  • Delete all models that are no longer needed.
  • Delete all models that belong to one user.
  • Create stats models from other models.

Jobs can be triggered manually or automatically. Manually this can be done through the release coordinator page. Automatically jobs can be run through code, this is used together with CRON to schedule jobs to be run at a specific date and time.

If you're already familiar with Apache Beam or are eager to start writing a new job, jump to the case studies. Otherwise, you can read the whole page. If you still have questions after reading, take a look at the Apache Beam Programming Guide for more details.

Apache Beam Job Architecture

Conceptually, an Apache Beam job is just a bunch of steps, each of which transforms some input data into some output data. For example, if you wanted to count how many interactions are in all of Oppia's explorations, you could break that task down into a series of transformations:

flowchart LR
    A(Explorations) -->|Count interactions| B(Counts) -->|Sum| C(Total)

For more complicated tasks, Apache Beam supports tasks whose transformations form a directed acyclic graph, or "DAG." These are just graphs with no cycles. For example, if you wanted to find the ratio of interactions to cards, you could use this DAG:

flowchart TD
E(Explorations) -->|Count interactions| C(Count) -->|Sum| NI(Num Interactions)
E -->|Count cards| C2(Count) -->|Sum| NC(Num Cards)
NC --> |Divide| IC(Interactions / Cards)
NI --> |Divide| IC(Interactions / Cards)

Note that the first example we saw, while linear, is still a DAG!

In Apache Beam, all jobs are represented as these DAGs. The nodes are represented as PValue objects, and the edges are represented as PTransform objects. Pipeline objects manage the DAGs, and Runner objects actually execute the jobs.

Next, we'll look at each of these components in more detail.

Pipelines

Pipelines manage the "DAG" of PValues and the PTransforms that compute them.

For example, here's a schematic representation of a Pipeline that counts the number of occurrences of every word in an input file and writes those counts to an output file:

flowchart TD
IF(Input File) -->|"io.ReadFromText(fname)"| L(Lines) -->|"FlatMap(str.split)"| W(Words) -->|"combiners.Count.PerElement()"| WC(word counts) --> |"MapTuple(lambda word, count: '%s: %d' % (word, count))"|w("word: #"s)
w -->|"io.WriteToText (ofname)"| OF(Output File)

Here's the code for this job:

class WordCountJob(base_jobs.JobBase):
    def run(self, fname, ofname):
        return (
            self.pipeline
            | 'Generate Lines' >> beam.io.ReadFromText(fname)
            | 'Generate Words' >> beam.FlatMap(str.split)
            | 'Generate (word, count)s' >> beam.combiners.Count.PerElement()
            | 'Generate "word: #"s' >> (
                beam.MapTuple(lambda word, count: '%s: %d' % (word, count)))
            | 'Write to Output File' >> beam.io.WriteToText(ofname)
        )

You might be wondering what's going on with the | and >> operators. In Python, objects can change how operators apply to them. Apache Beam has changed what the | and >> operators do, so | doesn't perform an OR operation anymore. Instead, | is a synonym for calling a PCollection's .apply() method with a PTransform to create a new PCollection. >> lets you name a PTransform step, which helps document your job. Note that at the very beginning, we also use | between the pipeline object and a PTransform to start building the job.

PValues

PCollections are the primary input and output PValues used by PTransforms. They are a kind of PValue that represent a dataset of (virtually) any size, including unbounded and continuous datasets.

PBegin and PEnd are "terminal" PValues that signal that the value cannot be produced by an operation (PBegin) or that no operation can act on the value (PEnd). For example, a Pipeline object is a PBegin, and the output of a write operation is a PEnd.

PTransforms

Recall that PTransforms represent the "edges" of the DAG and convert PValues into other PValues.

ParDo and DoFn

ParDo is the most flexible PTransform. It accepts DoFns, which are simple functions, as arguments and applies them to all elements of the input PCollection in parallel. It also accepts functions and lambda functions as arguments. It is analogous to the following code:

do_fn = DoFn()
for value in pcoll:
    do_fn(value)

Notice that the return value from the DoFn is not used. However, it's possible for the DoFn to hold onto state in more advanced implementations.

Map and FlatMap

beam.Map is an operation that transforms each item in a PCollection into a new value using a plain-old function. It is analogous to the following code (where fn is the transformation function):

new_pcoll = []
for value in pcoll:
    new_pcoll.append(fn(value))
return new_pcoll

beam.FlatMap is a similar transformation, but it flattens the output PCollection into a single output PCollection. It is analogous to the following code (where fn is the transformation function):

new_pcoll = []
for value in pcoll:
    for sub_value in fn(value):
        new_pcoll.append(sub_value)
return new_pcoll

Filter

beam.Filter returns a new PCollection with all the elements of an input PCollection, so long as calling a specified filtering function on the element returned True. It is analogous to the following code (for filtering function fn):

new_pcoll = []
for value in pcoll:
    if fn(value):
        new_pcoll.append(value)
return new_pcoll

GroupByKey

beam.GroupByKey is useful when you need to perform an operation on elements that share a common property. It takes an input PCollection of (key, value) elements and returns a mapping from each key to all the values that were associated with that key. It is analogous to the following code:

groups = collections.defaultdict(lambda: collections.defaultdict(list))
for i, pcoll in enumerate(pcolls_to_group):
    # NOTE: Each PCollection must have (key, value) pairs as elements.
    for key, value in pcoll:
        # Items from each PCollection are grouped under the same key and
        # bucketed into their corresponding index.
        groups[key][i].append(value)
return groups

Example of using GroupByKey,Filter, and FlatMap

For example, in our validation jobs we compute two PCollections:

# Tuples of (ModelKey, True) for each model in the datastore that exists.
existing_models_pcoll = ...
# Tuples of (ModelKey, str) for each error message that should be reported when
# the corresponding model instance does not exist.
errors_if_missing_pcoll = ...

To generate a report, we use GroupByKey to pair the messages to the existing models.

After this step, we can filter out the pairs where a model existed and report the errors that are left over.

error_pcoll = (
    (
        # A PCollection of Tuple[ModelKey, bool]. A ModelKey identifies an
        # individual model in the datastore.
        existing_models_pcoll,
        # A PCollection of Tuple[ModelKey, str]. Each item corresponds to an
        # error that should be reported when the corresponding instance does not
        # exist.
        errors_if_missing_pcoll,
    )
    # Returns a PCollection of Tuple[ModelKey, Tuple[List[bool], List[str]]].
    | beam.GroupByKey()
    # Discards ModelKey from the PCollection.
    | beam.Values()
    # Only keep groupings that indicate that the model is missing.
    | beam.Filter(lambda (exist_bools, _): not any(exist_bools))
    # Discard the bools and flatten the results into a PCollection of strings.
    | beam.FlatMap(lambda (_, errors): errors)
)

Runners

Runners provide the run() method used to visit every node (PValue) in the pipeline's DAG by executing the edges (PTransforms) to compute their values. At Oppia, we use DataflowRunner to have our Pipelines run on the Google Cloud Dataflow service.

Writing Apache Beam Jobs

For this section, we'll walk through the steps of implementing a job by writing one: CountExplorationStatesJob.

It's helpful to begin by sketching a diagram of what you want the job to do. We recommend using pen and paper or a whiteboard, but in this wiki page we'll use ASCII art to keep the document self-contained.

Here's a diagram for the CountExplorationStatesJob:

flowchart LR
    A(Explorations) -->|Count states| B(Counts) -->|Sum| C(Total)

TIP: As illustrated, you don't need to know what the names of the PTransforms (edges) used in a diagram are. It's easy to look up the appropriate PTransform after drawing the diagram.

Now that we have our bearings, let's get started on implementing the job.

1. Subclass the base_jobs.JobBase class and override the run() method

Make sure your job class name is clear and concise, because the name is presented to release coordinators:

Screenshot of the Release Coordinator page with a list of job names visible

Job names should follow the convention: <Verb><Noun>Job.

  • For example:

    class WeeklyDashboardStatsComputationJob(base_jobs.JobBase):
        """BAD: Name does not begin with a verb."""
    
        def run(self):
            ...
    
    
    class ComputeStatsJob(base_jobs.JobBase):
        """BAD: Unclear what kind of stats are being computed."""
    
        def run(self):
            ...
    
    
    class CountExplorationStatesJob(base_jobs.JobBase):
        """GOOD: Name starts with a verb and "exploration states" is unambiguous."""
    
        def run(self):
            ...

Module names should follow the convention: <noun>_<operation>_jobs.py.

  • For example:
    • blog_validation_jobs.py

    • dashboard_stats_computation_jobs.py

    • exploration_indexing_jobs.py

    • exploration_stats_regeneration_jobs.py

    • model_validation_jobs.py

      However, you should always prefer placing jobs in preexisting modules if an appropriate one already exists.

For this example, we will write our job in the module: core/jobs/batch_jobs/exploration_inspection_jobs.py.

2. Override the run() method to operate on self.pipeline

As illustrated in the Architecture section, jobs are organized by Pipelines, PTransforms, and PCollections. Jobs that inherit from JobBase are constructed with a Pipeline object already accessible via self.pipeline. When we write our jobs, we will build them off of self.pipeline.

Pipelines are special PValues that represent the entry-point of a job. PTransforms that operate on Pipeline are generally "producers"; that is to say, operations that produce initial PCollections to work off of.

We can represent this in our DAG by adding a special Pipeline node.

flowchart TD
P(Pipeline) -->|"GetModels()"| E(Explorations) -->|"Count states"| C(Counts) -->|"Sum"| T(Total)

Note

Since pipelines are a part of every job, it's fine to leave it out of a DAG to save on complexity.

Now, let's see how this would translate into code, starting with the Explorations.

from core.jobs import base_jobs
from core.jobs.io import ndb_io
from core.platform import models

(exp_models,) = models.Registry.import_models([models.NAMES.exploration])


class CountExplorationStatesJob(base_jobs.JobBase):

    def run(self):
        exp_model_pcoll = (
            self.pipeline
            | 'Get all ExplorationModels' >> ndb_io.GetModels(
                exp_models.ExplorationModel.get_all())
        )

Observe that:

  1. We're using ndb_io.GetModels rather than get_multi
  2. We're passing a Query to ndb_io.GetModels

We use ndb_io.GetModels() because we want to work on PCollections of models, not a list of models. In fact, all operations that can be taken on models (get, put, delete) have analogous PTransform interfaces defined in ndb_io. They are:

NDB function PTransform analogue
models = get_multi() model_pcoll = ndb_io.GetModels(Query(...))
put_multi(models) model_pcoll | ndb_io.PutMulti()
delete_multi(keys) key_pcoll | ndb_io.DeleteMulti()

Note that get_multi has the biggest change in interface, in that it takes a Query argument. You can get a query for any model by using the class method get_all.

IMPORTANT: Never use datastore_services.query_everything()!! Due to a limitation in Apache Beam, this operation is incredibly slow and inefficient! You are almost certainly doing something wrong if you need this function. Ask @brianrodri/@vojtechjelinek for help if you believe you need to use it regardless.

Why should we use these PTransform over the simpler get/put/delete functions? Performance. The get/put/delete function calls are all synchronous, so your job's performance will suffer greatly by waiting for the operations to complete.

PTransforms, on the other hand, are specially crafted to take advantage of the Apache Beam framework and guarantee better performance. In general, you should always prefer ndb_io over any of the get/put/delete functions! If you think you have a valid need for avoiding ndb_io, then speak with @brianrodri/@vojtechjelinek first.

Let's get back to implementing the job.

def run(self):
    exp_model_pcoll = (
        self.pipeline
        | 'Get all ExplorationModels' >> ndb_io.GetModels(
            exp_models.ExplorationModel.get_all())
    )

    state_count_pcoll = (
        exp_model_pcoll
        | 'Count states' >> beam.Map(self.get_number_of_states)
    )

Note that we chose to use beam.Map here instead of beam.ParDo. This is mostly a stylistic choice, as beam.Map is just a specialized version of ParDo, in that Map simply takes each input element and "maps" it to a single output element. In our case, each ExplorationModel will map to a single int, the number of states.

Here is the implementation of get_number_of_states. This function transforms the model into a domain object, and then counts the number of states in the corresponding dict.

def get_number_of_states(self, model: ExplorationModel) -> int:
    exploration = exp_fetchers.get_exploration_from_model(model)
    return len(exploration.states)

Finally, we need to sum all the counts together. We'll use beam.CombineGlobally to accomplish this, which uses an input function to combine values of a PCollection. It returns a PCollection with a single element: the result of the combination.

def run(self):
    exp_model_pcoll = (
        self.pipeline
        | 'Get all ExplorationModels' >> ndb_io.GetModels(
            exp_models.ExplorationModel.get_all())
    )

    state_count_pcoll = (
        exp_model_pcoll
        | 'Count states' >> beam.Map(self.get_number_of_states)
    )

    state_count_sum_pcoll = (
        state_count_pcoll
        | 'Sum values' >> beam.CombineGlobally(sum)
    )

IMPORTANT: We take special care to pass very simple objects (like ints and models) in between PTransforms. This is intentional, complex objects cannot be serialized without special care (TL;DR: objects must be picklable). When passing objects between PTransforms in your jobs, use simple data structures and simple types as much as possible.

With this, our objective is complete. However, there's still more code to write!

3. Have the run() method return a PCollection of JobRunResults

  • In English, this means that the job must report something about what occurred during its execution. For example, this can be the errors it discovered or the number of successful operations it was able to perform. Empty results are forbidden!

    • If you don't think your job has any results worth reporting, then just print a "success" metric with the number of models it processed.
  • JobRunResult has two fields: stdout and stderr. They are analogous to a program's output, and should be used in a similar capacity for jobs -- put problems encountered by the job in stderr and informational outputs in stdout.

  • JobRunResult outputs should answer the following questions:

    • Did the job run without any problems? How and why do I know?
    • How much work did the job manage to do?
    • If the job encountered a problem, what caused it?

Our job is trying to report the total number of states across all explorations, so we need to create a JobRunResult that holds that information. For this, we can use the as_stdout helper method:

def run(self):
    exp_model_pcoll = (
        self.pipeline
        | 'Get all ExplorationModels' >> ndb_io.GetModels(
            exp_models.ExplorationModel.get_all())
    )

    state_count_pcoll = (
        exp_model_pcoll
        | 'Count states' >> beam.Map(self.get_number_of_states)
    )

    state_count_sum_pcoll = (
        state_count_pcoll
        | 'Sum values' >> beam.CombineGlobally(sum)
    )

    return (
        state_count_sum_pcoll
        | 'Map as stdout' >> beam.Map(job_run_result.JobRunResult.as_stdout)
    )

The method maps every element in a PCollection to a JobRunResult with their stringified-values as its stdout.

4. Add the job module to core/jobs/registry.py

To have your job registered and acknowledged by the front-end, make sure to import the module in the corresponding section of core/jobs/registry.py: https://github.com/oppia/oppia/blob/973f777a6c5a8c3442846bda839e63856dfddf72/core/jobs/registry.py#L33-L50


With this, our job is finally completed!

Here is the cleaned-up implementation of our job:

from core.domain import exp_fetchers
from core.jobs import base_jobs
from core.jobs.io import ndb_io
from core.platform import models

import apache_beam as beam

(exp_models,) = models.Registry.import_models([models.NAMES.exploration])


class CountExplorationStatesJob(base_jobs.JobBase):

    def run(self) -> beam.PCollection[job_run_result.JobRunResult]:
        return (
            self.pipeline
            | 'Get all ExplorationModels' >> ndb_io.GetModels(
                exp_models.ExplorationModel.get_all())
            | 'Count states' >> beam.Map(self.get_number_of_states)
            | 'Sum values' >> beam.CombineGlobally(sum)
            | 'Map as stdout' >> beam.Map(job_run_result.JobRunResult.as_stdout)
        )

    def get_number_of_states(self, model: exp_models.ExplorationModel) -> int:
        exploration = exp_fetchers.get_exploration_from_model(model)
        return len(exploration.states)

Testing Apache Beam Jobs

First and foremost, you should follow our guidelines for writing backend tests. This includes naming your test cases (test_{{action}}_with_{{with_condition}}_{{has_expected_outcome}}) and our general test case structure ("Setup", "Baseline verification", "Action", "Endline verification").

There are two base classes dedicated to testing our Apache Beam jobs: PipelinedTestBase and JobTestBase.

PipelinedTestBase (and its subclass, JobTestBase) exposes two special assertion methods: assert_pcoll_equal and assert_pcoll_empty.

The class operates by first, in setUp(), entering the context of a Pipeline object (accessible via self.pipeline). Upon exiting the context, the Pipeline will execute any operations attached to it. Running the assert_pcoll_* methods will add a "verification" PTransform to the input PCollection, and then close the context (thus running it immediately). For this reason, only one assert_pcoll_* method may be called in a test case! If you want to run multiple assertions on a PCollection, then create a separate test case for that purpose.

Note

The verification PTransform will also run type checks on all inputs/outputs generated by the PTransforms under test!

Here's an example:

def test_validate_model_id_with_invalid_model_id_reports_an_error(self):
    # Setup.
    invalid_id_model = base_models.BaseModel(
        id='123@?!*',
        created_on=self.YEAR_AGO,
        last_updated=self.NOW)

    # Action.
    output = (
        self.pipeline
        | beam.Create([invalid_id_model])
        | beam.ParDo(base_validation.ValidateBaseModelId())
    )

    # Endline verification.
    self.assert_pcoll_equal(output, [
        base_validation_errors.ModelIdRegexError(
            invalid_id_model,
            base_validation.BASE_MODEL_ID_PATTERN),
    ])

For testing jobs, you should follow the following steps (we'll use CountExplorationStatesJob as an example):

1. Inherit from JobTestBase and override the class constant JOB_CLASS

The current convention is to name your test cases <JobName>Tests, but you can create better names if you want to break tests up. For our example, we'll keep things simple.

class CountExplorationStatesJobTests(test_jobs.JobTestBase):

    JOB_CLASS = CountExplorationStatesJob

2. Run assertions using a assert_job_output_is_* method

When testing a job, we should aim to cover behavior and common edge cases. For this job, we'll have 3 main tests:

  1. When there are no Explorations in the datastore.
  2. When there is exactly 1 Exploration in the datastore.
  3. When there are many Explorations in the datastore.
def test_empty_datastore(self):
    # Don't add any explorations to the datastore.
    self.assert_job_output_is_empty()

def test_single_exploration(self):
    self.save_new_linear_exp_with_state_names_and_interactions(
        'e1', 'o1', ['A', 'B', 'C'], ['TextInput'])

    self.assert_job_output_is([
        job_run_result.JobRunResult(stdout='3'),
    ])

def test_many_explorations(self):
    self.save_new_linear_exp_with_state_names_and_interactions(
        'e1', 'o1', ['A', 'B', 'C'], ['TextInput'])
    self.save_new_linear_exp_with_state_names_and_interactions(
        'e2', 'o1', ['D', 'E', 'F', 'G', 'H'], ['TextInput'])
    self.save_new_linear_exp_with_state_names_and_interactions(
        'e3', 'o1', ['I', 'J'], ['TextInput'])
    self.save_new_linear_exp_with_state_names_and_interactions(
        'e4', 'o1', ['K', 'L', 'M', 'N'], ['TextInput'])

    self.assert_job_output_is([
        job_run_result.JobRunResult(stdout='14'),
    ])

Note that self.assert_job_output_is(...) and self.assert_job_output_is_empty() do as advertised -- they run the job to completion and verify the result.

IMPORTANT: Only one assert_job_output_is assertion can be performed in a test body. Multiple calls will result in an exception instructing you to split the test apart.

Just because a job passes in unit tests does not guarantee it will pass in production. This is because workers, which execute the pipeline code, are run in a special environment where the code base is configured differently. While Oppia's jobs team works to resolve the differences, be careful about using complex and/or confusing objects. The simpler your job, the greater chance it'll work in production!

Running Apache Beam Jobs

Local Development Server

These instructions assume you are running a local development server. If you are a release coordinator running these jobs on the production or testing servers, you should already have been granted the "Release Coordinator" role, so you can skip steps 1-3.

  1. Sign in as an administrator (instructions).
  2. Navigate to Admin Page > Roles Tab.
  3. Add the "Release Coordinator" role to the username you are signed in with.
  4. Navigate to http://localhost:8181/release-coordinator, then to the Beam Jobs tab.
  5. Search for your job and then click the Play button.
  6. Click "Start new job".

Screen recording showing how to run jobs

Production Server

Before a job can be run and deployed in production, it must first be tested on the Oppia backup server.

If your job is not essential for the release and has not been fully tested by the release cut, then it is not going into the release. "Fully tested" means:

  • The job should run without failures on the Oppia backup server.
  • The job produces the expected output.
  • The job has the expected outcome (this must be verified by e.g. user-facing changes, or a validation job, or an output check, etc.).
  • The job should be explicitly approved by the server jobs admin (currently @seanlip and @vojtechjelinek).

Also, in case your job changes data in the datastore, there has to be a validation job accompanying it to verify that the data that you are changing is valid in the server. The validation job will have to be "Fully tested" before testing on the migration job can start.

In case there is invalid data observed, either your migration job should fix it programmatically, or the corresponding data has to be manually fixed before the migration job can be run. This is valid for both testing in the backup server and running in production.

For a full overview of the process to get your job tested on the Oppia backup server, refer to the corresponding wiki page.

Instruction for job testers

There are two ways to perform the testing of the Beam jobs, both of these need to be done by a person that has deploy access to the backup server.

Deploy to backup server

This way provides full testing of the job.

  1. Deploy branch containing the job to the backup server
  2. Run the job through the release coordinator page
  3. If the job fails you can check the details of the error on the Google Cloud Console in the Dataflow section
Run the job on backup server through the local dev server

The downside of this approach is that you cannot get the job output, you can only verify that it works.

In order to run jobs through the local dev server you need to have JSON key that will provide access to the backup server. The key can be generated according to step 4 and step 5 of the Quickstart for Python.

  1. Have the JSON key ready
  2. Add GOOGLE_APPLICATION_CREDENTIALS: "<absolute path to JSON key>" to env_variables in app_dev.yaml
  3. In core/domain/beam_job_services.py change value of run_synchronously to False
  4. In core/feconf.py
  5. Change the value of OPPIA_PROJECT_ID to 'oppiaserver-backup-migration'
  6. Change the value of DATAFLOW_TEMP_LOCATION to 'gs://oppiaserver-backup-migration-beam-jobs-temp/'
  7. Change the value of DATAFLOW_STAGING_LOCATION to 'gs://oppiaserver-backup-migration-beam-jobs-staging/'
  8. Start the dev server and run the job through the release coordinator page
  9. If the job fails you can check the details of the error on the Google Cloud Console in the Dataflow section

Beam guidelines

Do not use NDB put/get/delete directly

Even though it is possible to use NDB functions directly, they should not be used because they are slow and we have Beam compliant alternatives from them. All these alternatives are located in core/jobs/io/ndb_io.py.

  • Instead of using get, get_multi, get_by_id, etc. you should use GetModels, and you should pass a query to it, GetModels will execute the query and return a PCollection of the models that were returned by the query.
  • Instead of using put, put_multi, etc. you should use PutModels, and you just pipe a PCollection of models to it and they will be put into the datastore.
  • Instead of using delete, delete_multi, etc. you should use DeleteModels, and you just pipe a PCollection of models to it and they will be deleted from the datastore.

All of the aforementioned classes are already used in the codebase so you can look for examples.

Use get_package_file_contents for accessing files

If you need to access a file in a Beam job, please use get_package_file_contents (from core/constants.py) instead of open or open_file (from core/utils.py). Also, make sure that the file is included in the assets folder or is listed in MANIFEST.in explicitly.

Example

When we have a function that is used in a Beam pipeline, like:

@staticmethod
def function_used_in_beam_pipeline():
    file = utils.open_file('assets/images/about/cc.svg', 'rb')
    return file.read()

it needs to be replaced with something like:

@staticmethod
def function_used_in_beam_pipeline():
    return constants.get_package_file_contents(
        'assets', 'images/about/cc.svg', binary_mode=True
    )

Common Beam errors

'_UnwindowedValues' object is not subscriptable error

This error usually happens when you attempt to access an element in what you expect is a list, but Beam actually didn't convert it to a list. The solution usually is to transform the element to list explicitly using list(). Some comments on the internet might suggest a usage of SessionWindow or similar stuff, but since all our jobs are batch jobs that process some final list of elements this solution won't work.

Example

new_user_stats_models = (
    {
        'suggestion': suggestions_grouped_by_target,
        'opportunity': exp_opportunities
    }
    | 'Merge models' >> beam.CoGroupByKey()
    | 'Get rid of key' >> beam.Values()  # pylint: disable=no-value-for-parameter
    | 'Generate stats' >> beam.ParDo(
        lambda x: self._generate_stats(
            x['suggestion'][0] if len(x['suggestion']) else [],
            x['opportunity'][0][0] if len(x['opportunity']) else None
        ))
)

The code above throws this error '_UnwindowedValues' object is not subscriptable [while running 'Generate stats'], we know that the issue is in the last part of the code ('Generate stats' part). After some debugging, we discover that the code needs to be changed to.

| 'Generate stats' >> beam.ParDo(
    lambda x: self._generate_stats(
        x['suggestion'][0] if len(x['suggestion']) else [],
        list(x['opportunity'][0])[0] if len(x['opportunity']) else None
    ))

_namedptransform is not iterable error

This error sometimes happens when you forget to add a label for some operation (the strings of code before >>). The solution is to add a label for all operations.

Example

some_values = (
    some_models
    | beam.Values()
)

The code above might return '_namedptransform is not iterable in the job output. We can fix this by adding an appropriate label.

some_values = (
    some_models
    | 'Get values' >> beam.Values()
)

Guidelines for writing beam jobs

This section provides some general guidelines for writing Beam jobs, which would be particularly helpful for new contributors.

Planning a job

  • If your Beam job includes updating the storage models, make sure to write an audit job. An audit job is similar to your Beam job but doesn't make any changes to the datastore. During testing, the audit job is run before the actual Beam job to prevent any unwanted changes to the datastore in case of an error. Some examples which illustrate this paradigm are listed below:
  • There should also be a job to verify the changes done by your job.
  • Consider the example of topic migration job. The AuditTopicMigrateJob is the audit job which performs all the steps in the main job (MigrateTopicJob), except it doesn't write those changes to the datastore.
  • It is often helpful to include debugging information with your job from the outset, since modifying a failed job and rerunning it can take time. Feel free to include additional logs or counts that will help you debug any issues that arise during execution. To help you identify these easily, you can prefix the relevant lines of output with an identifier in square brackets (e.g. [NUMBER OF MODELS PROCESSED]).

Executing jobs

  • Refer to code from similar jobs to avoid mistakes. The "Troubleshooting" section in the wiki lists common errors encountered while executing jobs.
  • Make sure to write tests for all jobs. These tests should check the behaviour of the job for different cases of inputs. The common cases for all jobs are listed below:
    • Empty input.
    • Input which triggers the job to perform its intended action.
    • Incorrect input which causes the job to fail.
  • The tests should ensure that the job runs appropriately for all such cases.

PR guidelines

  • A PR for a Beam job should always have "Proof of work" in the description which should include the successful local run of the Beam job on the release coordinator page and a screenshot of the storage model which is being changed. The storage models can be checked locally via dsadmin.

Job testing workflow

  • A testing request for the job must be submitted as soon as the PR is created using this form. Make sure to be clear in your instructions (in the doc) about what exactly needs to be done by the tester. Also, the doc link should be included in the PR description.
  • The sample job testing template should be used as a starting point for the testing doc. Make sure to make a copy before editing the doc.
  • In case of multiple jobs to be tested, they should be written in the order of testing. Generally, an audit job is run before the migration job to verify that the job runs as intended.
  • The instructions should be concise to prevent any confusion for the tester. Any pre and post checks to be done should also be specified clearly.
  • After the doc is complete, request a review from the main code reviewer of the related PR. The reviewer should fill in the approval section at the top of the doc.
  • Once approvals have been received, the assigned server admin will then test the PR according to the instructions in the doc and update the PR author with the results. In case of errors, the server admin would provide the relevant logs for debugging.

Case Studies

The case studies are sorted in order of increasing complexity. Study the one that best suits your needs.

If none of them help you implement your job, you may request a new one by adding a comment to #13190 with answers to the following questions:

  • Why do I want a new case study?
  • Why are the current case studies insufficient?
  • What answers would the "perfect" case study provide?

Then we'll start write a new Case Study to help you, and future contributors, as soon as we can (@brianrodri will always notify you of how long it'll take).

Case Study: SchemaMigrationJob

Difficulty: Medium

Key Concepts:

  • Getting and Putting NDB models
  • Partitioning one PCollection into many PCollections.
  • Returning variable outputs from a DoFn

Let's start by listing the specification of a schema migration job:

  • We can assume:

    • The schema version of a model is in the closed range [1, N], where N is the latest version.
    • All migration functions are implemented in terms of taking n to n + 1.
  • Our job should conform to the following requirements:

    • Models should only be put into storage after successfully migrating to vN.
    • Models that were already at vN should be reported separately.
    flowchart TD
    IM(Input Models) -->|"Partition(lambda model: model.schema_version)"| MV("Model @v1")
    IM -->|"Partition(lambda model: model.schema_version)"| M(Model ...)
    MV -->|"ParDo(MigrateToNextVersion())"| M
    MV ---- M
    M --> MvN("Model @vN")
    MvN -->|"ndb_io.PutModels()"| D(Datastore)
    

There's a lot of complexity here, so we'll need many PTransforms to write our job. We'll focus on the most interesting one: the loop to migrate models to the next version.

class MigrateToNextVersion(beam.DoFn):

    def process(self, input_model):
        if input_model.schema_version < ExplorationModel.LATEST_SCHEMA_VERSION:
            model = job_utils.clone_model(input_model)
            exp_services.migrate_to_next_version(model)
            yield model


class MigrateToLatestVersion(beam.PTransform):
    """Diagram:

    .--------------. Partition(lambda model: model.schema_version)
    | Input Models | ---------------------------------------------.
    '--------------'                                              |
                                                 .-----------.    |
                        .----------------------- | Model @v1 | <--|
                        |                        '-----------'    |
                        |                                         |
                        | ParDo(MigrateToNextVersion())           |
                         >-----------------------------.          |
                        |                              |          |
                        |                              v          |
                        |                        .-----------.    |
                        '----------------------- | Model ... | <--'
                                                 '-----------'
                                                       |
                                                       v
                                                 .-----------.
                                                 | Model @vN |
                                                 '-----------'
    """

    def expand(self, exp_model_pcoll):
        models_by_schema_version = (
            exp_model_pcoll
            | beam.Partition(
                lambda model, _: model.schema_version - 1,
                ExplorationModel.LATEST_SCHEMA_VERSION)
        )

        do_fn = MigrateToNextVersion()
        results = [models_by_schema_version[0] | beam.Map(do_fn)]

        for models_at_ith_version in models_by_schema_version[1:-1]:
            models_to_migrate = (
                (results[-1].updated_models, models_at_ith_version)
                | beam.Flatten()
            )
            results.append(models_to_migrate | beam.FlatMap(do_fn))

Note that this implementation won't work as-is since we focused on the step where we upgrade the models. To get this fully working, we'd need to write a Pipeline that handles loading in the models and writing the upgraded models back to the datastore.

Core documentation


Developing Oppia


Developer Reference

Clone this wiki locally