Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SNAP-2366] row buffer fault-in, forced rollover, merge small batches #1046

Open
wants to merge 32 commits into
base: master
Choose a base branch
from

Conversation

sumwale
Copy link
Contributor

@sumwale sumwale commented Jun 4, 2018

Changes proposed in this pull request

  • add check for the two cases in table stats service:
    • "large enough" row buffer (currently "large enough" is anything more than maxDeltaRows/8)
      that has not seen any updates/deletes since the last check; in this case schedule
      a task to force rollover the row buffer in column table
    • also check if a bucket of column table has multiple small batches (non-transactional check);
      if so then submit a task to merge those after checking for transactional snapshot;
      merge is done by locally created ColumnTableScan->ColumnInsertExec plan where the scan
      uses an iterator only on the small batches
  • added a ColumnFormatStatsIterator that can take a bunch of stats rows and create an iterator
    over just those (like required for batch merge)
  • added new scan metrics for disk reads: a) disk rows from row buffer,
    b) partial column batches on disk, c) full column batches on disk
  • extended SQLMetrics types with a new SPLIT_SUM_METRIC that allows displaying multiple
    metrics against a common name; ColumnTableScan now uses this to combine some metrics
    else it becomes too large in display (especially for the newly added disk read metrics)
  • use hive-metadata (ExternalTableMetaData) to get number of rows instead of getting
    from row buffer table (that is subject to changes in future)
  • added a metric for remote batch fetch
  • fix an NPE in SnappyTableStatsProviderService while filling up
    result map from members since CHM cannot hold null values
  • use a common entry map in ColumnFormatIterator disk iteration instead of creating
    separate for every column batch
  • added implementation of PURGE_CODEGEN_CACHES in StoreCallbacksImpl
  • limit to one task per table for background rolloverRowBuffer and mergeSmallBatches tasks

Patch testing

precheckin; new unit tests to be added next

ReleaseNotes.txt changes

NA

Other PRs

TIBCOSoftware/snappy-store#391

Sumedh Wale and others added 6 commits May 30, 2018 05:42
- add check for the two cases in table stats service:
  - "large enough" row buffer (currently "large enough" is anything more than maxDeltaRows/8)
    that has not seen any updates/deletes since the last check; in this case schedule
    a task to force rollover the row buffer in column table
  - also check if a bucket of column table has multiple small batches (non-transactional check);
    if so then submit a task to merge those after checking for transactional snapshot;
    merge is done by locally created ColumnTableScan->ColumnInsertExec plan where the scan
    uses an iterator only on the small batches
- added a ColumnFormatStatsIterator that can take a bunch of stats rows and create an iterator
  over just those (like required for batch merge)
- added new scan metrics for disk reads: a) disk rows from row buffer,
  b) partial column batches on disk, c) full column batches on disk
- extended SQLMetrics types with a new SPLIT_SUM_METRIC that allows displaying multiple metrics
  against a common name; ColumnTableScan now uses this to combine some metrics else it becomes
  too large in display (especially for the newly added disk read metrics)
- use hive-metadata (ExternalTableMetaData) to get number of rows instead of getting
  from row buffer table (that is subject to changes in future)
- fixed disk metrics collection added previously; set the metric correctly
  for both row buffer iterator (ResultSetTraversal) and ColumnFormatIterator
- added a metric for remote batch fetch
- fixed multiple ColumnTableScans causing split metrics to add up into one
  ColumnTableScan; now use a unique ID for split metrics for each ColumnTableScan instance
- fix an NPE in SnappyTableStatsProviderService while filling up
  result map from members since CHM cannot hold null values
- use a common entry map in ColumnFormatIterator disk iteration instead of creating
  separate for every column batch
- added implementation of PURGE_CODEGEN_CACHES as StoreCallbacksImpl.clearCodegenCaches
- limit to one task per table for background rolloverRowBuffer and mergeSmallBatches tasks
- replaced a few usage of Map.put with justPut for koloboke maps
Trilok Khairnar and others added 3 commits June 5, 2018 23:19
Conflicts:
	core/src/main/scala/io/snappydata/SnappyTableStatsProviderService.scala
	core/src/main/scala/io/snappydata/TableStatsProviderService.scala
	store
Conflicts:
	cluster/src/test/scala/io/snappydata/benchmark/TPCHColumnPartitionedTable.scala
Copy link
Contributor

@rishitesh rishitesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First set of comments. Can see more after one of the points about updated & deleted columns is clarified.

}
}

private def withExceptionHandling(f: => Unit, doFinally: () => Unit = null): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method can be moved to Utils class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

logInfo(s"Found small batches in ${pr.getName}: ${smallBatches.map(_.getId)}")
val cache = pr.getGemFireCache
implicit val executionContext = Utils.executionContext(cache)
Future(withExceptionHandling({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be a situation where many such future tasks might pile up for the same Bucket region if the future thread does not get a chance to complete before next stats publish. It would be better if we mark the BucketRegion and exclude it from picking in progress buckets.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That cannot happen because this deliberately uses a "mergeTasks" map with "computeIfAbsent" that will add only one future for a PR at a time. The mergeTasks is cleared for that PR only at the end of Future execution. Same for rolloverTasks.

partitionColumnAliases = Nil, baseRelation = null, schema, allFilters = Nil,
schemaAttrs, caseSensitive = true)
// reduce min delta row size to avoid going through rolloverRowBuffers again
val insertPlan = ColumnInsertExec(tableScan, Nil, Nil,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please explain here, maybe in the comment section how this will handle updated columns and deleted columns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ColumnBatchIterator is passed the stats row entry. Rest all the columns, including delta/delete are looked up by the iterator (see ColumnFormatStatsIterator.getColumnValue) when the generated code asks for them.

So this will be same as iterating batches in ColumnTableScan that will return merged entries with deltas/deletes applied. The ColumnInsert is tied to output of this hence will create a combined merged batch.

Note that merging of deltas into main batch (when they become large enough) or deletes into main batch (when large number of entries are deleted) will be handled separately. This does not depend on main batch being small rather deltas being large. That merge needs to be done in the operation thread itself (that has created the last delta causing it to grow large). Right now only the case where all entries are deleted is handled.

Conflicts:
	core/src/main/scala/org/apache/spark/sql/store/CompressionCodecId.scala
@sumwale sumwale changed the base branch from GITHUB-982 to master June 8, 2018 00:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants