Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADAM-2023] Implemented Duplicate Marking algorithm in Spark SQL #2045

Conversation

jonpdeaton
Copy link

With these changes, I see as much as a 30% speedup for large datasets. Fixes #2023

With these changes, I see as much as a 30% speedup for large datasts.
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@heuermh
Copy link
Member

heuermh commented Sep 7, 2018

Jenkins, test this please

@heuermh
Copy link
Member

heuermh commented Sep 7, 2018

Jenkins, add to whitelist

* @param alignmentRecords GenomicRDD of alignment records
* @return RDD of alignment records with the "duplicateRead" field marked appropriately
*/
def apply(alignmentRecords: AlignmentRecordRDD): RDD[AlignmentRecord] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should refactor the caller here so that we don't force the conversions between RDD and Dataset. In other words, only perform the conversions if alignmentRecords has been realized as RDDBoundAlignmentRecordRDD.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so there should be another apply method which takes Dataset[AlignmentRecord] and this apply method's signature should be changed to take RDDBoundAlignmentRecordRDD?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or do you mean that this method should just be changed to take Dataset[AlignmentRecord]. In this case would it also make sense to refactor so that the other apply method also just takes Dataset[Fragment] and do a similar refactoring of the caller?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this look right? We would want the caller to be

abstract class AlignmentRecordRDD ... {
  def markDuplicates(): AlignmentRecordRDD = {
    replaceRdd(MarkDuplicates(this.rdd, this.recordGroups))
  }
}

case class DatasetBoundAlignmentRecordRDD ... {
  override def markDuplicates(): AlignmentRecordRDD = {
    replaceDataset(MarkDuplicates(this.dataset, this.recordGroups))
  }
}

abstract class FragmentRDD ... {
  def markDuplicates(): FragmentRDD = {
    replaceRdd(MarkDuplicates(this.rdd, this.recordGroups))
  }
}

case class DatasetBoundFragmentRDD ... {
  override def markDuplicates(): FragmentRDD = {
    replaceDataset(MarkDuplicates(this.dataset, this.recordGroups))
  }
}

so then the apply methods might be

import org.bdgenomics.formats.avro.{
  AlignmentRecord,
  Fragment
}
import org.bdgenomics.adam.sql.{
  AlignmentRecord => AlignmentRecordProduct,
  Fragment => FragmentProduct
}

object MarkDuplicates {
  def apply(RDD[AlignmentRecord], RecordGroupDictionary): RDD[AlignmentRecord] = { }
  def apply(RDD[Fragment], RecordGroupDictionary): RDD[Fragment] = { }
  def apply(Dataset[AlignmentRecordProduct], RecordGroupDictionary): Dataset[AlignmentRecordProduct] = { }
  def apply(Dataset[FragmentProduct], RecordGroupDictionary): Dataset[FragmentProduct] = { }
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright sounds good. Because of type erasure there will need to be a single apply[T] for RDD and for Dataset for fragments/alignment-records.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good point. Is RecordGroupDictionary actually required for the fragment cases? If not, that could be the discriminator.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that it is necessary for both so can't be used as a distinguisher.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to take RDDBoundAlignmentRecordRDD and DatasetBoundAlignmentRecordRDD so that the conversion from RDD to Dataset that is already implemented in RDDBoundAlignmentRecordRDD is not duplicated within
MarkDuplicates?

* Case class which merely extends the Fragment Schema by a single column "duplicateFragment" so that
* a DataFrame with fragments having been marked as duplicates can be cast back into a DataSet
*/
private case class FragmentDuplicateSchema(readName: Option[String] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it would be useful to add duplicateFragment or similarly named field to the Avro schema definition for Fragment, or is this flag only useful in a temporary context?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. I was considering it while developing but then I realized that it really only seems useful in a temporary context since each of the alignment records contained within the fragment also contain a duplicate flag so the schema has redundancies.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.08%) to 79.054% when pulling 5310fce on jonpdeaton:enhancement/MarkDuplictes-SparkSQL into f1cc2cf on bigdatagenomics:master.

@coveralls
Copy link

coveralls commented Sep 7, 2018

Coverage Status

Coverage decreased (-0.1%) to 79.004% when pulling 8e5e245 on jonpdeaton:enhancement/MarkDuplictes-SparkSQL into f1cc2cf on bigdatagenomics:master.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/ADAM-prb/2804/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/ADAM-prb/2807/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/ADAM-prb/2808/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/ADAM-prb/2815/
Test PASSed.

Because the Spark SQL implementation of duplciate marking
was not scaling well to cluster runs, this version convers
many of the groupBy followed by Join back to the original dataset
operations with Window functions. This hopefully reduces the
amount of data that has to be shuffled when running on a cluster
and will make the performance benefits scale.
@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/ADAM-prb/2820/
Test PASSed.

@heuermh
Copy link
Member

heuermh commented Jan 6, 2020

Closing as WontFix. Performance testing these changes were inconclusive. Feel free to create a new PR after rebasing against git head.

@heuermh heuermh closed this Jan 6, 2020
@heuermh heuermh added this to the 0.31.0 milestone Jan 6, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Mark Duplicates implemented in Spark SQL
4 participants