Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move to Collections API and optimize count algorithm #47

Open
lossyrob opened this issue Jul 19, 2017 · 3 comments
Open

Move to Collections API and optimize count algorithm #47

lossyrob opened this issue Jul 19, 2017 · 3 comments

Comments

@lossyrob
Copy link

This code provides a starting point for the Collections API optimizations, and is basically an optimized version of def histograms(nlcd: TileLayerRDD[SpatialKey], soil: TileLayerRDD[SpatialKey], multiPolygons: Seq[MultiPolygon]): Seq[Map[(Int, Int), Int]] that uses the Collections API:

val layerReader = S3CollectionLayerReader("datahub-catalogs-us-east-1", "")
val queryPoly: MultiPolygon = ???
val nlcdLayerId = LayerId("nlcd-2011-30m-epsg5070-int8", 0)
val soilLayerId = LayerId("ssurgo-hydro-groups-30m-epsg5070-int8", 0)

val (nlcdLayer, soilLayer) = {
  val fetch1 =
    Future {
      timeFetch(s"  Fetch Time NLCD:") {
        baseLayerReader
          .query[SpatialKey, Tile, TileLayerMetadata[SpatialKey]](nlcdLayerId)
          .where(Intersects(queryPoly)).result
      }
    }

  val fetch2 =
    Future {
      timeFetch(s"  Fetch Time SOIL:") {
        baseLayerReader
          .query[SpatialKey, Tile, TileLayerMetadata[SpatialKey]](soilLayerId)
          .where(Intersects(queryPoly)).result
      }
    }

  val fetches =
    for(
      (_, layer1) <- fetch1;
      (_, layer2) <- fetch2
    ) yield { (layer1, layer2) }

  Await.result(fetches, 10 minutes)
}

histograms(
  nlcdLayer,
  soilLayer,
  queryPoly
)

where histograms is:

def histograms(
  layer1: TileLayerCollection[SpatialKey],
  layer2: TileLayerCollection[SpatialKey],
  polygon: MultiPolygon
): Map[(Int, Int), Long] = {
  import spire.syntax.cfor._

  val mapTransform = layer1.metadata.layout.mapTransform

  val layer1Map = layer1.toMap
  val layer2Map = layer2.toMap

  val result =
    layer1Map.keys.toSet
      .intersect(layer2Map.keys.toSet)
      .par
      .map { k =>
        val tileResult = mutable.Map[(Int, Int), Long]()
        val (tile1, tile2) = (layer1Map(k), layer2Map(k))
        // Assumes tile1.dimensions == tile2.dimensions
        val re = RasterExtent(mapTransform(k), tile1.cols, tile1.rows)

        if(polygon.contains(re.extent)) {
          cfor(0)(_ < re.cols, _ + 1) { col =>
            cfor(0)(_ < re.rows, _ + 1) { row =>
              val v1 = tile1.get(col, row)
              val v2 = tile2.get(col, row)
              val k = (v1, v2)
              if(!tileResult.contains(k)) { tileResult(k) = 0 }
              tileResult(k) += 1
            }
          }
        } else {
          polygon.foreach(re, Options(includePartial=true, sampleType=PixelIsArea)) { (col, row) =>
            val v1 = tile1.get(col, row)
            val v2 = tile2.get(col, row)
            val k = (v1, v2)
            if(!tileResult.contains(k)) { tileResult(k) = 0 }
            tileResult(k) += 1
          }
        }

        tileResult.toMap
      }
      .reduce { (m1, m2) =>
        (m1.toSeq ++ m2.toSeq)
          .groupBy(_._1)
          .map { case(k, values) => k -> values.map(_._2).sum }
      }

  result
}
@lossyrob
Copy link
Author

lossyrob commented Jul 19, 2017

The Await is required because I was running in a standalone process. In an akka-http route, you could simply return the Future, in which case the top portion of code should read:

val layerReader = S3CollectionLayerReader("datahub-catalogs-us-east-1", "")
val queryPoly: MultiPolygon = ???
val nlcdLayerId = LayerId("nlcd-2011-30m-epsg5070-int8", 0)
val soilLayerId = LayerId("ssurgo-hydro-groups-30m-epsg5070-int8", 0)

val nlcdFetch =
  Future {
    baseLayerReader
      .query[SpatialKey, Tile, TileLayerMetadata[SpatialKey]](nlcdLayerId)
      .where(Intersects(poly)).result
  }

val soilFetch =
  Future {
    baseLayerReader
      .query[SpatialKey, Tile, TileLayerMetadata[SpatialKey]](soilLayerId)
      .where(Intersects(poly)).result
  }

for(
  nlcdLayer <- nlcdFetch;
  soilLayer <- soilFetch
) yield {
  histograms(
    nlcdLayer,
    soilLayer,
    queryPoly
  )
}

@rajadain
Copy link
Member

(I apologize in advance for the length of this comment.)

Hi @lossyrob,

In our production code, we have to support 1-3 layers. Thus, we cannot work with specific ones as shown in the code above. To support a dynamic number of layers, we usually do something like:

val layerReader = S3CollectionLayerReader("datahub-catalogs-us-east-1", "")

def layerFetch (
  layerId: LayerId,
  multiPolygon: MultiPolygon
): TileLayerCollection[SpatialKey] =
  layerReader
    .query[SpatialKey, Tile, TileLayerMetadata[SpatialKey]](layerId)
    .where(Intersects(multiPolygon))
    .result

def doTheThing (
  input: InputJson
) = {
  val layerIds = input.layerIds.map(lid => LayerId(lid, input.zoom))
  val multiPolygon = input.polygons.unionGeometries.asMultiPolygon.get

  val layers = layerIds.map(layerFetch(_, multiPolygon))

  rasterGroupedCount(layers, multiPolygon)
}

Where doTheThing will be called by the proper AKKA-Http route.

We adapted your histogram from above into rasterGroupedCount to work with a sequence of layers instead of two explicit ones:

def rasterGroupedCount (
  layers: Seq[TileLayerCollection[SpatialKey]],
  multiPolygon: MultiPolygon
): Map[Seq[Int], Long] = {
  val mapTransform = layers.head.metadata.mapTransform

  joinCollectionLayers(layers)
    .par
    .map { case (key, tiles) => 
      val tileResult = mutable.Map[Seq[Int], Long]()
      val re = RasterExtent(mapTransform(key), tiles.head.cols, tiles.head.rows)

      if (multiPolygon.contains(re.extent)) {
        cfor(0)(_ < re.cols, _ + 1) { col => 
          cfor(0)(_ < re.rows, _ + 1) { row => 
            val vs = tiles.map(_.get(col, row))
            if (!tileResult.contains(vs)) {
              tileResult(vs) = 0
            }

            tileResult(vs) += 1
          }
        }
      } else {
        multiPolygon.foreach(re, Options(includePartial=true, sampleType=PixelIsArea)) { (col, row) =>
          val vs = tiles.map(_.get(col, row))
          if (!tileResult.contains(vs)) {
            tileResult(vs) = 0
          }

          tileResult(vs) += 1
        }
      }

      tileResult.toMap
    }
    .reduce { (m1, m2) => 
      (m1.toSeq ++ m2.toSeq)
        .groupBy(_._1)
        .map { case (key, values) => key -> values.map(_._2).sum }
    }
}

Here is an alternative implementation of rasterGroupedCount that @echeipesh came up with a few months ago:

def rasterGroupedCount (
  layers: Seq[TileLayerCollection[SpatialKey]],
  multiPolygon: MultiPolygon
): Map[Seq[Int], Long] = {
  val init = () => new LongAdder
  val update = (_: LongAdder).increment()
  val metadata = layers.head.metadata

  val pixelGroups: TrieMap[Seq[Int], LongAdder] = TrieMap.empty

  joinCollectionLayers(layers)
    .par
    .foreach { case (key, tiles) =>
      val re = RasterExtent(metadata.mapTransform(key), metadata.layout.tileCols, metadata.layout.tileRows)

      Rasterizer.foreachCellByMultiPolygon(multiPolygon, re) { case (col, row) =>
        val pixelGroup: Seq[Int] = tiles.map(_.get(col, row))
        val acc = pixelGroups.getOrElseUpdate(pixelGroup, init())
        update(acc)
      }
    }

  pixelGroups.mapValues(_.sum())
}

where joinCollectionLayers used in both implementations is:

def joinCollectionLayers(layers: Seq[TileLayerCollection[SpatialKey]]): Map[SpatialKey, Seq[Tile]] = {
  val maps: Seq[Map[SpatialKey, Tile]] = layers.map((_: Seq[(SpatialKey, Tile)]).toMap)
  val keySet: Array[SpatialKey] = maps.map(_.keySet).reduce(_ union _).toArray
  for (key: SpatialKey <- keySet) yield {
    val tiles: Seq[Tile] = maps.map(_.apply(key))
    key -> tiles
  }
}.toMap

In our benchmarks, we found the fastest implementations, quickest first, to be:

  1. Your histograms with two explicit layers (avg 2.6s for ~3800 sq.km.)
  2. Eugene's rasterGroupedCount with a list of layers (avg 3.25s for ~3800 sq.km.)
  3. The adapted rasterGroupedCount above with cfor with a list of layers (avg 3.8s for ~3800 sq.km.)

Given that we do want to operate on a list of layers, my questions are:

  1. Can joinCollectionLayers be made faster, since that seems to be holding the cfor implementation back?
  2. Is there any reason to prefer cfor over the more concise TrieMap / LongAdder variant?
  3. All this work is disregarding Future since we wanted to focus on the calculations, hoping to add asynchrony later. Should we be using that here somewhere, and would it increase performance?

@lossyrob
Copy link
Author

I tried some optimizations, but it looks like nothing I did significantly (or insignificantly) improved on the TrieMap implementation.

My suggestion for further optimization is to use Futures for fetching layers in parallel. This would mean changing this method to return a Future[Seq[TileLayerCollection[SpatialKey]]], and spidering the Futures through the codebase. akka-http lets you return a Future from the route, so you could carry the future all the way through to the route.

So the start of the refactor starts with moving the method to something like

import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global

  def cropRastersToAOI(
    rasterIds: List[String],
    zoom: Int,
    aoi: MultiPolygon
  ): Future[Seq[TileLayerCollection[SpatialKey]]] =
    Future.sequence {
      val futures: Seq[Future[TileLayerCollection[SpatialKey]]] =
        rasterIds
          .map { str =>
            Future {
              cropSingleRasterToAOI(str, zoom, aoi)
            }
          }
      futures
    }

and fixing compiler errors from there, by changing everything to work with futures by mapping into the future. E.g.:

  def getRasterGroupedCount(input: InputData): Future[ResultInt] = {
    val aoi = createAOIFromInput(input)
    val rasterLayers = cropRastersToAOI(input.rasters, input.zoom, aoi)
    rasterLayers.map { layers => ResultInt(rasterGroupedCount(layers, aoi)) }
  }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants