Skip to content

v0.2.48..v0.2.49 changeset ParallelBoundedApiReader.cpp

Garret Voltz edited this page Oct 2, 2019 · 1 revision
diff --git a/hoot-core/src/main/cpp/hoot/core/io/ParallelBoundedApiReader.cpp b/hoot-core/src/main/cpp/hoot/core/io/ParallelBoundedApiReader.cpp
new file mode 100644
index 0000000..fbb40b4
--- /dev/null
+++ b/hoot-core/src/main/cpp/hoot/core/io/ParallelBoundedApiReader.cpp
@@ -0,0 +1,280 @@
+/*
+ * This file is part of Hootenanny.
+ *
+ * Hootenanny is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ * --------------------------------------------------------------------
+ *
+ * The following copyright notices are generated automatically. If you
+ * have a new notice to add, please use the format:
+ * " * @copyright Copyright ..."
+ * This will properly maintain the copyright information. DigitalGlobe
+ * copyrights will be updated automatically.
+ *
+ * @copyright Copyright (C) 2019 DigitalGlobe (http://www.digitalglobe.com/)
+ */
+
+#include "ParallelBoundedApiReader.h"
+
+//  Hootenanny
+#include <hoot/core/io/HootNetworkRequest.h>
+#include <hoot/core/util/FileUtils.h>
+#include <hoot/core/util/GeometryUtils.h>
+#include <hoot/core/util/StringUtils.h>
+
+//  Qt
+#include <QUrlQuery>
+
+namespace hoot
+{
+
+ParallelBoundedApiReader::ParallelBoundedApiReader(bool useOsmApiBboxFormat, bool addProjection)
+  : _dataType(DataType::Text),
+    _totalResults(0),
+    _totalEnvelopes(0),
+    _bboxContinue(true),
+    _coordGridSize(ConfigOptions().getReaderHttpBboxMaxSize()),
+    _maxGridSize(ConfigOptions().getReaderHttpBboxMaxDownloadSize()),
+    _threadCount(ConfigOptions().getReaderHttpBboxThreadCount()),
+    _fatalError(false),
+    _useOsmApiBboxFormat(useOsmApiBboxFormat),
+    _addProjection(addProjection),
+    _continueRunning(true),
+    _filenumber(0)
+{
+}
+
+ParallelBoundedApiReader::~ParallelBoundedApiReader()
+{
+  stop();
+}
+
+void ParallelBoundedApiReader::beginRead(const QUrl& endpoint, const geos::geom::Envelope& envelope)
+{
+  //  Validate the size of the envelope before beginning, don't allow the whole earth to be downloaded!
+  if (envelope.getWidth() > _maxGridSize || envelope.getHeight() > _maxGridSize)
+  {
+    throw UnsupportedException("Cannot request areas larger than " +
+                               QString::number(_maxGridSize, 'f', 4) + " square degrees.");
+  }
+  //  Save the endpoint URL to query
+  _url = endpoint;
+  //  Split the envelope if it is bigger than the prescribed max
+  int lon_div = 1;
+  int lat_div = 1;
+  if (envelope.getWidth() > _coordGridSize)
+    lon_div = (int)std::ceil(envelope.getWidth() / _coordGridSize);
+  if (envelope.getHeight() > _coordGridSize)
+    lat_div = (int)std::ceil(envelope.getHeight() / _coordGridSize);
+  //  Record the number of envelopes to start with
+  _totalEnvelopes = lat_div * lon_div;
+  //  Setup the envelopes to query in a grid
+  for (int i = 0; i < lon_div; ++i)
+  {
+    double lon = envelope.getMinX() + _coordGridSize * i;
+    for (int j = 0; j < lat_div; ++j)
+    {
+      double lat = envelope.getMinY() + _coordGridSize * j;
+      _bboxMutex.lock();
+      //  Start at the upper right corner and create boxes left to right, top to bottom
+      _bboxes.push(
+          geos::geom::Envelope(
+              lon,
+              std::min(lon + _coordGridSize, envelope.getMaxX()),
+              lat,
+              std::min(lat + _coordGridSize, envelope.getMaxY())));
+      _bboxMutex.unlock();
+    }
+  }
+  //  Start up the processing threads
+  for (int i = 0; i < _threadCount; ++i)
+    _threads.push_back(std::thread(&ParallelBoundedApiReader::_process, this));
+}
+
+bool ParallelBoundedApiReader::isComplete()
+{
+  //  Get the total number of envelopes
+  _bboxMutex.lock();
+  int envelopes = _totalEnvelopes;
+  _bboxMutex.unlock();
+  //  Get the total number of results
+  _resultsMutex.lock();
+  int results = _totalResults;
+  _resultsMutex.unlock();
+  //  Done means there is one result for each envelope
+  return envelopes == results && results > 0;
+}
+
+bool ParallelBoundedApiReader::getSingleResult(QString& result)
+{
+  bool success = true;
+  //  takeFirst() pops the first element and returns it
+  _resultsMutex.lock();
+  if (_resultsList.size() > 0)
+    result = _resultsList.takeFirst();
+  else
+    success = false;
+  _resultsMutex.unlock();
+  //  Return the result found
+  return success;
+}
+
+bool ParallelBoundedApiReader::hasMoreResults()
+{
+  _resultsMutex.lock();
+  bool more = _resultsList.size() > 0;
+  _resultsMutex.unlock();
+  bool done = isComplete();
+  //  There are more results when the queue contains results
+  //  or the threads are still processing envelopes
+  //  and there isn't an error
+  return (more || !done) && !_fatalError;
+}
+
+void ParallelBoundedApiReader::wait()
+{
+  //  Wait on the threads to complete
+  for (std::size_t i = 0; i < _threads.size(); ++i)
+    _threads[i].join();
+}
+
+void ParallelBoundedApiReader::stop()
+{
+  //  Stop the threads
+  _continueRunning = false;
+  //  Wait for all threads to stop
+  wait();
+}
+
+void ParallelBoundedApiReader::_sleep()
+{
+  //  Sleep for 10 milliseconds
+  std::this_thread::sleep_for(std::chrono::milliseconds(10));
+}
+
+void ParallelBoundedApiReader::_process()
+{
+  //  Continue working until all of the results are back
+  while (!isComplete() && _continueRunning && !_fatalError)
+  {
+    //  Try to grab the next envelope on the queue
+    geos::geom::Envelope envelope;
+    envelope.setToNull();
+    _bboxMutex.lock();
+    if (!_bboxes.empty())
+    {
+      envelope = _bboxes.front();
+      _bboxes.pop();
+    }
+    _bboxMutex.unlock();
+    //  Make sure that we got a valid envelope
+    if (!envelope.isNull())
+    {
+      //  Add the bbox to the query string
+      QUrl url = _url;
+      QUrlQuery query(_url);
+      if (query.hasQueryItem("bbox"))
+        query.removeQueryItem("bbox");
+      //  Use the correct type of bbox for this query
+      QString bboxQuery;
+      if (_useOsmApiBboxFormat)
+        bboxQuery = GeometryUtils::toConfigString(envelope);
+      else
+        bboxQuery = GeometryUtils::toString(envelope);
+      //  Some APIs require the bounding box's projection, add it here
+      if (_addProjection)
+        bboxQuery += ",EPSG:4326";
+      query.addQueryItem("bbox", bboxQuery);
+      url.setQuery(query);
+
+      HootNetworkRequest request;
+      LOG_VART(url);
+      request.networkRequest(url);
+      //  Check the HTTP status code and result
+      int status = request.getHttpStatus();
+      QString result = QString::fromUtf8(request.getResponseContent().data());
+      switch (status)
+      {
+      case 200:
+        //  Store the result and increment the number of results received
+        _resultsMutex.lock();
+        _resultsList.append(result);
+        _totalResults++;
+        _resultsMutex.unlock();
+        //  Write out a "debug map" for each result that comes in
+        writeDebugMap(result, "bounded-reader-result");
+        break;
+      case 400:
+        //  Split the envelope in quarters and push them all back on the queue
+        {
+          double lon1 = envelope.getMinX();
+          double lon2 = envelope.getMinX() + envelope.getWidth() / 2.0f;
+          double lon3 = envelope.getMaxX();
+
+          double lat1 = envelope.getMinY();
+          double lat2 = envelope.getMinY() + envelope.getHeight() / 2.0f;
+          double lat3 = envelope.getMaxY();
+          _bboxMutex.lock();
+          //  Split the boxes into quads and push them onto the queue
+          _bboxes.push(geos::geom::Envelope(lon1, lon2, lat1, lat2));
+          _bboxes.push(geos::geom::Envelope(lon2, lon3, lat1, lat2));
+          _bboxes.push(geos::geom::Envelope(lon1, lon2, lat2, lat3));
+          _bboxes.push(geos::geom::Envelope(lon2, lon3, lat2, lat3));
+          //  Increment by three because 1 turned into 4, i.e. 3 more were added
+          _totalEnvelopes += 3;
+          _bboxMutex.unlock();
+        }
+        break;
+      case 509:
+        LOG_ERROR(request.getErrorString());
+        _fatalError = true;
+        break;
+      default:
+        LOG_ERROR("Unexpected Error: HTTP " << status << " : " << request.getErrorString());
+        _fatalError = true;
+        break;
+      }
+    }
+    else
+    {
+      //  Give-up the timeslice since there is no work to be done currently
+      _sleep();
+    }
+  }
+}
+
+void ParallelBoundedApiReader::writeDebugMap(const QString& data, const QString& name)
+{
+  if (ConfigOptions().getDebugMapsWrite())
+  {
+    //  Get the file extension based on the data type downloaded
+    QString ext;
+    switch (_dataType)
+    {
+    default:
+    case DataType::Text:      ext = "txt";      break;
+    case DataType::OsmXml:    ext = "osm";      break;
+    case DataType::Json:      ext = "json";     break;
+    case DataType::GeoJson:   ext = "geojson";  break;
+    }
+    //  Get the unique file number and increment it
+    _filenumberMutex.lock();
+    const QString filenumber = StringUtils::getNumberStringPaddedWithZeroes(_filenumber++, 3);
+    _filenumberMutex.unlock();
+    //  Write out the text to a uniquely named file
+    FileUtils::writeFully(QString("tmp/%1-%2.%3").arg(name).arg(filenumber).arg(ext), data);
+  }
+}
+
+}
Clone this wiki locally