Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beam backend: use TypedPipe descriptions as names for PTransforms #1983

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from

Conversation

navinvishy
Copy link
Contributor

Problem: When debugging Beam jobs running on Dataflow, it is hard to map a failure in the Dataflow UI back to the corresponding line in the Scalding code.

Solution: Assign the description of the TypedPipe as the name of the PTransform(this name appears in the Dataflow UI). This allows mapping the error back to the line in the Scalding code since by default, the descriptions contain the line numbers.

In order to make this work, I changed the Beam backend optimization rules to not use the DescribeLater rule, which moves a WithDescriptionTypedPipe higher in the AST. This was necessary in order to preserve the location of the description in the AST and apply it to the right PTransform in Beam. This possibly introduces a few unintended consequences for the optimizer:

  • Some composition rules such as ComposeMap, ComposeFlatMap etc. may not be applicable since those rely on DescribeLater being run in an earlier phase. While such optimizations dont seem to exist in Beam, they do have something along these lines in Dataflow. So this should ideally not have any performance implications, at least in Dataflow.
  • DescribeLater also allows ComposeDescriptions to work better, which reduces the number of nodes in the AST. Its possible that removing DescribeLater can make the AST very large.

Problem: When debugging Beam jobs running on Dataflow, it is hard to map a failure in the Dataflow UI back to the corresponding line in the Scalding code.

Solution: Assign the description of the TypedPipe as the name of the PTransform(this name appears in the UI). This allows mapping the error back to the
line in the Scalding code since by default, the descriptions contain the line numbers.
@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.


Navin Viswanath seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

@codecov-commenter
Copy link

codecov-commenter commented Apr 8, 2022

Codecov Report

Merging #1983 (769359a) into develop (e44f2f2) will increase coverage by 2.17%.
The diff coverage is 86.36%.

Impacted file tree graph

@@              Coverage Diff              @@
##             develop    #1983      +/-   ##
=============================================
+ Coverage      35.74%   37.92%   +2.17%     
- Complexity      1079     1153      +74     
=============================================
  Files            316      316              
  Lines          20941    21147     +206     
  Branches        2850     2919      +69     
=============================================
+ Hits            7485     8019     +534     
+ Misses         12557    12136     -421     
- Partials         899      992      +93     
Impacted Files Coverage Δ
...om/twitter/scalding/beam_backend/BeamBackend.scala 72.97% <86.36%> (+1.69%) ⬆️
...ala/com/twitter/scalding/beam_backend/BeamOp.scala 83.87% <86.36%> (+0.46%) ⬆️
...rc/main/scala/com/twitter/scalding/TypedJson.scala 0.00% <0.00%> (ø)
...ain/scala/com/twitter/scalding/dagon/Literal.scala 39.28% <0.00%> (ø)
...tter/scalding/db/macros/impl/JdbcFieldSetter.scala 0.00% <0.00%> (ø)
...lding/db/macros/impl/handler/DateTypeHandler.scala 0.00% <0.00%> (ø)
...ding/db/macros/impl/handler/AnnotationHelper.scala 0.00% <0.00%> (ø)
...ing/db/macros/impl/handler/StringTypeHandler.scala 0.00% <0.00%> (ø)
...ng/db/macros/impl/handler/NumericTypeHandler.scala 0.00% <0.00%> (ø)
.../db/macros/impl/ColumnDefinitionProviderImpl.scala 0.00% <0.00%> (ø)
... and 53 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update e44f2f2...769359a. Read the comment docs.

Copy link
Collaborator

@johnynek johnynek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just had a couple of minor comments.

rec[a](wd.input)
case (wd: WithDescriptionTypedPipe[_], rec) => {
val op = rec(wd.input)
op.withName(wd.descriptions.map(_._1).head)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.head can throw. Can we instead do:

wd.descriptions match {
  case head :: _ =>
    op.withName(head._1)
  case Nil =>
    op
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I thought that for a WithDescriptionTypedPipe this could not be empty.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it maybe can't but we didn't make that clear in the types... :/

@@ -116,7 +134,7 @@ object BeamPlanner {
val ops: Seq[BeamOp[(K, Any)]] = cg.inputs.map(tp => rec(tp))
CoGroupedOp(cg, ops)
}
go(cg)
if (cg.descriptions.isEmpty) go(cg) else go(cg).withName(cg.descriptions.last)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why using last here but head above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to ask you about this myself. If you look at the test case in this PR, The cogrouped expression has two descriptions "Count words" and "Join with t1", both of which appear in the descriptions of CoGrouped. But since this appears to be appending descriptions for eg. in CoGrouped.Pair and the optimization rule ComposeDescriptions, I chose to keep the last one. Hence no assertion for "Count words" even in that test case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not combine all of them? Why not just cg.descriptions.mkString("\n") or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let me try this out on a job first. Thanks!

@@ -137,7 +155,21 @@ object BeamPlanner {

def defaultOptimizationRules(config: Config): Seq[Rule[TypedPipe]] = {
def std(forceHash: Rule[TypedPipe]) =
OptimizationRules.standardMapReduceRules :::
List(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the change here? Is this copying the same cascading optimizations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It really is the standardMapReduceRules with DescribeLater excluded. I've mentioned in the summary of this PR that I chose to exclude that rule in order to capture the line numbers at the right points, without the optimizer changing the AST. I'm curious if there's a better way to do this though.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I missed that. Note: the line numbers are already captured by the time that rule runs. The line numbers are collected on the TypedPipe. I don't see how removing that rule helps descriptions.

If I were you, I would look at all the descriptions and add the full list.

The problem with removing that rule is that it will block merging nodes together. It may be fine, maybe Beam will follow up with optimizations, but I would be careful: scalding may do some optimizations that beam doesn't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The line numbers are in the descriptions though, if users don't explicitly add them with a call to withDescription.
Here's what I'm trying to do: if we have a TypedPipe that looks something like this:
WithDescription(Mapped(WithDescription(FlatMapped(), "a")), "b")
I'm trying to associate the description "a" with the PTransform corresponding to the FlatMapped and the description "b" with the PTransform corresponding to the Mapped. By default, the "a" and "b" here are line numbers unless the user explicitly added their own descriptions.
If I let DescribeLater run, I think we'll end up with something like this:
WithDescription(FlatMapped(), ["a","b"])
(since this would also compose the Mapped and FlatMapped, and then also run ComposeDescriptions)
This would be hard to debug, when there is an error in this stage in the job since it's hard to map back to the code.
But as I write this, I'm thinking if its probably better to just concatenate the line numbers instead...
Dataflow (which is the Beam runner we're using) does apply similar optimizations (I've mentioned this in the summary), but I'm debating now if its a good idea to change what the optimizer does, since it definitely makes the optimizer less useful. What do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really know the answer. So, line numbers are added to descriptions when the user constructs the TypedPipe, see:

private[scalding] def withLine: TypedPipe[T] =

The idea of DescribeLater is to try to merge as many operations into one step and then put all the line numbers/descriptions on that single step. By taking just a single item you are hiding which actions might be combined.

As to what to do, I don't know. If beam's optimizer is very good, it maybe doesn't matter. Maybe you should try to separate runs with a somewhat complex job and compare?

Also, I can imagine a Config setting like scalding.preserve_description_order that removes applying that optimization. Then if you were debugging, you could set that to true, but for a production job you could set it to false.

I would probably bias to just combining all the descriptions into a single beam description unless you actually see problems. You can always come back and add that setting. I would personally bias to maximizing the utility of the optimizer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Let me try out some jobs with the rule enabled and showing all the descriptions.

import com.twitter.scalding.typed.functions.ComposedFunctions.ComposedMapGroup
import com.twitter.scalding.typed.functions.{EmptyGuard, MapValueStream, ScaldingPriorityQueueMonoid, SumAll}
import com.twitter.scalding.typed.{CoGrouped, Input}
import com.twitter.scalding.typed.{CoGrouped, TypedSource}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the change back from Input?

(I think I know the answer: you all are actually using a fork... I really hope you will get onto the mainline branch. I am contributing my time to help you, I hope you will contribute your energy to help others by actually using the open source version).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't really change it back...this was just a missed import, but you're right that we're using a fork. Thank you for investing your time in this, it really is much appreciated! I have it on my todo list to get us on the mainline branch. I do need to go through your earlier refactor more closely though.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow. I changed this code to using Input which is a subclass and replacement for TypedSource. It seems the diff changes it back from Input to TypedSource. Can you restate what you meant that this is a "missed import"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I wasn't clear. This change is now outdated - I updated it to remove the reference to TypedSource. The reason it was there in the first place was because I brought it over from our fork, but I hadn't removed unused imports, which is why it showed up. You should see no reference to TypedSource now in the diff.

rec[a](wd.input)
case (wd: WithDescriptionTypedPipe[_], rec) => {
val op = rec(wd.input)
wd.descriptions match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, as commented, I think this is probably running the risk of dropping some descriptions.

I would do:

op.withName(wd.descriptions.map(_._1).mkString(", "))

otherwise I think you will wind up with cases where you lose line numbers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'll try this out.

Copy link
Collaborator

@johnynek johnynek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this generally looks good, I'm just worried about two things:

  1. disabling moving descriptions deactivating many (most?) optimizations
  2. not including all the descriptions (I think you will lose line numbers).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants