Skip to content

v0.2.51..v0.2.52 changeset OsmApiWriter.cpp

Garret Voltz edited this page Jan 15, 2020 · 1 revision
diff --git a/hoot-core/src/main/cpp/hoot/core/io/OsmApiWriter.cpp b/hoot-core/src/main/cpp/hoot/core/io/OsmApiWriter.cpp
index d55d06b..254c2c6 100644
--- a/hoot-core/src/main/cpp/hoot/core/io/OsmApiWriter.cpp
+++ b/hoot-core/src/main/cpp/hoot/core/io/OsmApiWriter.cpp
@@ -22,7 +22,7 @@
  * This will properly maintain the copyright information. DigitalGlobe
  * copyrights will be updated automatically.
  *
- * @copyright Copyright (C) 2018, 2019 DigitalGlobe (http://www.digitalglobe.com/)
+ * @copyright Copyright (C) 2018, 2019, 2020 DigitalGlobe (http://www.digitalglobe.com/)
  */
 
 #include "OsmApiWriter.h"
@@ -48,15 +48,17 @@ using namespace Tgs;
 namespace hoot
 {
 
-const char* OsmApiWriter::API_PATH_CAPABILITIES = "/api/capabilities/";
-const char* OsmApiWriter::API_PATH_PERMISSIONS = "/api/0.6/permissions/";
-const char* OsmApiWriter::API_PATH_CREATE_CHANGESET = "/api/0.6/changeset/create/";
-const char* OsmApiWriter::API_PATH_CLOSE_CHANGESET = "/api/0.6/changeset/%1/close/";
-const char* OsmApiWriter::API_PATH_UPLOAD_CHANGESET = "/api/0.6/changeset/%1/upload/";
-const char* OsmApiWriter::API_PATH_GET_ELEMENT = "/api/0.6/%1/%2/";
+const char* OsmApiWriter::API_PATH_CAPABILITIES = "/api/capabilities";
+const char* OsmApiWriter::API_PATH_PERMISSIONS = "/api/0.6/permissions";
+const char* OsmApiWriter::API_PATH_CREATE_CHANGESET = "/api/0.6/changeset/create";
+const char* OsmApiWriter::API_PATH_CLOSE_CHANGESET = "/api/0.6/changeset/%1/close";
+const char* OsmApiWriter::API_PATH_UPLOAD_CHANGESET = "/api/0.6/changeset/%1/upload";
+const char* OsmApiWriter::API_PATH_GET_ELEMENT = "/api/0.6/%1/%2";
 
 OsmApiWriter::OsmApiWriter(const QUrl &url, const QString &changeset)
   : _description(ConfigOptions().getChangesetDescription()),
+    _source(ConfigOptions().getChangesetSource()),
+    _hashtags(ConfigOptions().getChangesetHashtags()),
     _maxWriters(ConfigOptions().getChangesetApidbWritersMax()),
     _maxPushSize(ConfigOptions().getChangesetApidbSizeMax()),
     _maxChangesetSize(ConfigOptions().getChangesetMaxSize()),
@@ -77,6 +79,8 @@ OsmApiWriter::OsmApiWriter(const QUrl &url, const QString &changeset)
 OsmApiWriter::OsmApiWriter(const QUrl& url, const QList<QString>& changesets)
   : _changesets(changesets),
     _description(ConfigOptions().getChangesetDescription()),
+    _source(ConfigOptions().getChangesetSource()),
+    _hashtags(ConfigOptions().getChangesetHashtags()),
     _maxWriters(ConfigOptions().getChangesetApidbWritersMax()),
     _maxPushSize(ConfigOptions().getChangesetApidbSizeMax()),
     _maxChangesetSize(ConfigOptions().getChangesetMaxSize()),
@@ -136,15 +140,18 @@ bool OsmApiWriter::apply()
   //  Start the writer threads
   LOG_INFO("Starting " << _maxWriters << " processing threads.");
   for (int i = 0; i < _maxWriters; ++i)
-    _threadPool.push_back(thread(&OsmApiWriter::_changesetThreadFunc, this));
+  {
+    _threadStatus.push_back(ThreadStatus::Working);
+    _threadPool.push_back(thread(&OsmApiWriter::_changesetThreadFunc, this, i));
+  }
   //  Setup the progress indicators
   long total = _changeset.getTotalElementCount();
   float progress = 0.0f;
   float increment = 0.01f;
   //  Setup the increment
-  if (total < 100000)
+  if (total < 10000)
     increment = 0.1f;
-  else if (total < 1000000)
+  else if (total < 100000)
     increment = 0.05f;
   //  Iterate all changes until there are no more elements to send
   while (_changeset.hasElementsToSend())
@@ -172,6 +179,22 @@ bool OsmApiWriter::apply()
         _workQueue.push(changeset_info);
         _workQueueMutex.unlock();
       }
+      else if (queueSize == 0 && !newChangeset && _changeset.hasElementsToSend() && _threadsAreIdle())
+      {
+        //  This is an error case, the queue is empty, there are still elements to send but
+        //  all of the threads are idle and not waiting for something to come back
+        //  There are two things that can be done here, first is to put everything that is
+        //  "ready to send" in a changeset and send it OR move everything to the error state
+
+        //  Option #1: Get all of the remaining elements as a single changeset
+        _changesetMutex.lock();
+        _changeset.calculateRemainingChangeset(changeset_info);
+        _changesetMutex.unlock();
+        //  Push that changeset
+        _workQueueMutex.lock();
+        _workQueue.push(changeset_info);
+        _workQueueMutex.unlock();
+      }
       else
       {
         //  Allow time for the worker threads to complete some work
@@ -213,18 +236,24 @@ bool OsmApiWriter::apply()
   return success;
 }
 
-void OsmApiWriter::_changesetThreadFunc()
+void OsmApiWriter::_changesetThreadFunc(int index)
 {
+  //  Set the status to working
+  _threadStatusMutex.lock();
+  _threadStatus[index] = ThreadStatus::Working;
+  _threadStatusMutex.unlock();
   //  Setup the network request object with OAuth or with username/password authentication
   HootNetworkRequestPtr request = createNetworkRequest(true);
   long id = -1;
   long changesetSize = 0;
+  bool stop_thread = false;
   //  Iterate until all elements are sent and updated
-  while (!_changeset.isDone())
+  while (!_changeset.isDone() && !stop_thread)
   {
     ChangesetInfoPtr workInfo;
     //  Try to get something off of the work queue
     _workQueueMutex.lock();
+    int queueSize = _workQueue.size();
     if (!_workQueue.empty())
     {
       workInfo = _workQueue.front();
@@ -234,10 +263,14 @@ void OsmApiWriter::_changesetThreadFunc()
 
     if (workInfo)
     {
+      //  Set the status to working
+      _threadStatusMutex.lock();
+      _threadStatus[index] = ThreadStatus::Working;
+      _threadStatusMutex.unlock();
       //  Create the changeset ID if required
       if (id < 1)
       {
-        id = _createChangeset(request, _description);
+        id = _createChangeset(request, _description, _source, _hashtags);
         changesetSize = 0;
       }
       //  An ID of less than 1 isn't valid, try to fix it
@@ -282,14 +315,33 @@ void OsmApiWriter::_changesetThreadFunc()
       }
       else
       {
-        //  Log the error
-        LOG_ERROR("Error uploading changeset: " << id << "\t" << request->getErrorString());
+        //  Log the error as a status message
+        LOG_STATUS("Error uploading changeset: " << id << " - " << request->getErrorString() << " (" << request->getHttpStatus() << ")");
+        //  If this is the last changeset, error it all out and finish working
+        if (workInfo->getLast())
+        {
+          //  Fail the entire changeset
+          _changeset.updateFailedChangeset(workInfo, true);
+          //  Looping should end the thread because all of the remaining elements have now been set to the failed state
+          continue;
+        }
         //  Split the changeset on conflict errors
         switch (info->status)
         {
         case 409:   //  Conflict, check for version conflicts and fix, or split and continue
           {
-            if (_fixConflict(request, workInfo, info->response))
+            if (_changesetClosed(info->response))
+            {
+              //  The changeset was closed already so set the ID to -1 and reprocess
+              id = -1;
+              //  Push the changeset back on the queue
+              _workQueueMutex.lock();
+              _workQueue.push(workInfo);
+              _workQueueMutex.unlock();
+              //  Loop back around to work on the next changeset
+              continue;
+            }
+            else if (_fixConflict(request, workInfo, info->response))
             {
               _workQueueMutex.lock();
               _workQueue.push(workInfo);
@@ -304,39 +356,41 @@ void OsmApiWriter::_changesetThreadFunc()
         case 400:   //  Placeholder ID is missing or not unique
         case 404:   //  Diff contains elements where the given ID could not be found
         case 412:   //  Precondition Failed, Relation with id cannot be saved due to other member
+          if (!_splitChangeset(workInfo, info->response))
           {
-            _changesetMutex.lock();
-            ChangesetInfoPtr split = _changeset.splitChangeset(workInfo, info->response);
-            _changesetMutex.unlock();
-            if (split->size() > 0)
-            {
-              _workQueueMutex.lock();
-              _workQueue.push(split);
-              _workQueue.push(workInfo);
-              _workQueueMutex.unlock();
-            }
-            else
+            if (!workInfo->getAttemptedResolveChangesetIssues())
             {
-              if (!workInfo->getAttemptedResolveChangesetIssues())
+              //  Set the attempt issues resolved flag
+              workInfo->setAttemptedResolveChangesetIssues(true);
+              //  Try to automatically resolve certain issues, like out of date version
+              if (_resolveIssues(request, workInfo))
               {
-                //  Set the attempt issues resolved flag
-                workInfo->setAttemptedResolveChangesetIssues(true);
-                //  Try to automatically resolve certain issues, like out of date version
-                if (_resolveIssues(request, workInfo))
-                {
-                  _workQueueMutex.lock();
-                  _workQueue.push(workInfo);
-                  _workQueueMutex.unlock();
-                }
-                else
-                {
-                  //  Set the element in the changeset to failed because the issues couldn't be resolved
-                  _changeset.updateFailedChangeset(workInfo);
-                }
+                _workQueueMutex.lock();
+                _workQueue.push(workInfo);
+                _workQueueMutex.unlock();
+              }
+              else
+              {
+                //  Set the element in the changeset to failed because the issues couldn't be resolved
+                _changeset.updateFailedChangeset(workInfo);
               }
             }
           }
           break;
+        case 500:   //  Internal Server Error, could be caused by the database being saturated
+        case 502:   //  Bad Gateway, there are issues with the gateway, split and retry
+        case 504:   //  Gateway Timeout, server is taking too long, split and retry
+          if (!_splitChangeset(workInfo, info->response))
+          {
+            //  Splitting failed which means that the changeset only has one element in it,
+            //  push it back on the queue and give the API a break
+            _workQueueMutex.lock();
+            _workQueue.push(workInfo);
+            _workQueueMutex.unlock();
+            //  Sleep the thread
+            this_thread::sleep_for(chrono::milliseconds(10));
+          }
+          break;
         default:
           //  This is a big problem, report it and try again
           LOG_ERROR("Changeset upload responded with HTTP status response: " << request->getHttpStatus());
@@ -356,7 +410,23 @@ void OsmApiWriter::_changesetThreadFunc()
       }
     }
     else
-      this_thread::sleep_for(chrono::milliseconds(10));
+    {
+      if (!_changeset.hasElementsToSend() && queueSize == 0 && _threadsAreIdle())
+      {
+        //  In this case there are elements that have been sent and not reported back
+        //  BUT there are no threads that are waiting for them either
+        stop_thread = true;
+      }
+      else
+      {
+        //  Set the status to idle
+        _threadStatusMutex.lock();
+        _threadStatus[index] = ThreadStatus::Idle;
+        _threadStatusMutex.unlock();
+        //  Sleep the thread
+        this_thread::sleep_for(chrono::milliseconds(10));
+      }
+    }
   }
   //  Close the changeset if one is still open
   if (id != -1)
@@ -367,6 +437,8 @@ void OsmApiWriter::setConfiguration(const Settings& conf)
 {
   ConfigOptions options(conf);
   _description = options.getChangesetDescription();
+  _source = options.getChangesetSource();
+  _hashtags = options.getChangesetHashtags();
   _maxPushSize = options.getChangesetApidbSizeMax();
   _maxChangesetSize = options.getChangesetMaxSize();
   _maxWriters = options.getChangesetApidbWritersMax();
@@ -508,7 +580,10 @@ bool OsmApiWriter::_parsePermissions(const QString& permissions)
 }
 
 //  https://wiki.openstreetmap.org/wiki/API_v0.6#Create:_PUT_.2Fapi.2F0.6.2Fchangeset.2Fcreate
-long OsmApiWriter::_createChangeset(HootNetworkRequestPtr request, const QString& description)
+long OsmApiWriter::_createChangeset(HootNetworkRequestPtr request,
+                                    const QString& description,
+                                    const QString& source,
+                                    const QString& hashtags)
 {
   try
   {
@@ -519,8 +594,11 @@ long OsmApiWriter::_createChangeset(HootNetworkRequestPtr request, const QString
       "  <changeset>"
       "    <tag k='created_by' v='%1'/>"
       "    <tag k='comment' v='%2'/>"
+      "    <tag k='source' v='%3'/>"
+      "    <tag k='hashtags' v='%4'/>"
+      "    <tag k='bot' v='yes'/>"
       "  </changeset>"
-      "</osm>").arg(HOOT_NAME).arg(description);
+      "</osm>").arg(HOOT_NAME).arg(description).arg(source).arg(hashtags);
 
     request->networkRequest(changeset, QNetworkAccessManager::Operation::PutOperation, xml.toUtf8());
 
@@ -616,7 +694,7 @@ OsmApiWriter::OsmApiFailureInfoPtr OsmApiWriter::_uploadChangeset(HootNetworkReq
       info->success = true;
       break;
     case 400:
-      LOG_WARN("Changeset Upload Error: Error parsing XML changeset\n" << info->response);
+      LOG_WARN("Changeset Upload Error: Error parsing XML changeset - " << info->response);
       break;
     case 404:
       LOG_WARN("Unknown changeset or elements don't exist");
@@ -669,6 +747,11 @@ bool OsmApiWriter::_fixConflict(HootNetworkRequestPtr request, ChangesetInfoPtr
   return success;
 }
 
+bool OsmApiWriter::_changesetClosed(const QString &conflictExplanation)
+{
+  return _changeset.matchesChangesetClosedFailure(conflictExplanation);
+}
+
 bool OsmApiWriter::_resolveIssues(HootNetworkRequestPtr request, ChangesetInfoPtr changeset)
 {
   bool success = false;
@@ -773,4 +856,43 @@ HootNetworkRequestPtr OsmApiWriter::createNetworkRequest(bool requiresAuthentica
   return request;
 }
 
+bool OsmApiWriter::_threadsAreIdle()
+{
+  bool response = true;
+  //  Lock the thread status mutex only once
+  _threadStatusMutex.lock();
+  for (vector<ThreadStatus>::iterator it = _threadStatus.begin(); it != _threadStatus.end(); ++it)
+  {
+    //  It only takes one thread working to return false
+    if (*it == ThreadStatus::Working)
+    {
+      response = false;
+      break;
+    }
+  }
+  //  Unlock the thread status mutex
+  _threadStatusMutex.unlock();
+  return response;
+}
+
+bool OsmApiWriter::_splitChangeset(const ChangesetInfoPtr& workInfo, const QString& response)
+{
+  //  Try to split the changeset in half
+  _changesetMutex.lock();
+  ChangesetInfoPtr split = _changeset.splitChangeset(workInfo, response);
+  _changesetMutex.unlock();
+  if (split->size() > 0)
+  {
+    //  Push both of the changesets onto the queue
+    _workQueueMutex.lock();
+    _workQueue.push(split);
+    _workQueue.push(workInfo);
+    _workQueueMutex.unlock();
+    return true;
+  }
+  //  Nothing was split out, return false
+  return false;
+}
+
+
 }
Clone this wiki locally