Skip to content

v0.2.54..v0.2.55 changeset OsmApiWriter.cpp

Garret Voltz edited this page Aug 14, 2020 · 1 revision
diff --git a/hoot-core/src/main/cpp/hoot/core/io/OsmApiWriter.cpp b/hoot-core/src/main/cpp/hoot/core/io/OsmApiWriter.cpp
index 7842b74..9a06144 100644
--- a/hoot-core/src/main/cpp/hoot/core/io/OsmApiWriter.cpp
+++ b/hoot-core/src/main/cpp/hoot/core/io/OsmApiWriter.cpp
@@ -51,7 +51,8 @@ namespace hoot
 {
 
 OsmApiWriter::OsmApiWriter(const QUrl &url, const QString &changeset)
-  : _description(ConfigOptions().getChangesetDescription()),
+  : _startFlag(false),
+    _description(ConfigOptions().getChangesetDescription()),
     _source(ConfigOptions().getChangesetSource()),
     _hashtags(ConfigOptions().getChangesetHashtags()),
     _maxWriters(ConfigOptions().getChangesetApidbWritersMax()),
@@ -67,7 +68,8 @@ OsmApiWriter::OsmApiWriter(const QUrl &url, const QString &changeset)
     _changesetCount(0),
     _debugOutput(ConfigOptions().getChangesetApidbWriterDebugOutput()),
     _debugOutputPath(ConfigOptions().getChangesetApidbWriterDebugOutputPath()),
-    _apiId(0)
+    _apiId(0),
+    _threadsCanExit(false)
 {
   _changesets.push_back(changeset);
   if (isSupported(url))
@@ -75,7 +77,8 @@ OsmApiWriter::OsmApiWriter(const QUrl &url, const QString &changeset)
 }
 
 OsmApiWriter::OsmApiWriter(const QUrl& url, const QList<QString>& changesets)
-  : _changesets(changesets),
+  : _startFlag(false),
+    _changesets(changesets),
     _description(ConfigOptions().getChangesetDescription()),
     _source(ConfigOptions().getChangesetSource()),
     _hashtags(ConfigOptions().getChangesetHashtags()),
@@ -92,7 +95,8 @@ OsmApiWriter::OsmApiWriter(const QUrl& url, const QList<QString>& changesets)
     _changesetCount(0),
     _debugOutput(ConfigOptions().getChangesetApidbWriterDebugOutput()),
     _debugOutputPath(ConfigOptions().getChangesetApidbWriterDebugOutputPath()),
-    _apiId(0)
+    _apiId(0),
+    _threadsCanExit(false)
 {
   if (isSupported(url))
     _url = url;
@@ -140,12 +144,12 @@ bool OsmApiWriter::apply()
   _changeset.fixMalformedInput();
   //  Start the writer threads
   LOG_INFO("Starting " << _maxWriters << " processing threads.");
+  _threadIdle.reserve(_maxWriters);
   for (int i = 0; i < _maxWriters; ++i)
   {
     _threadStatus.push_back(ThreadStatus::Working);
     _threadPool.push_back(thread(&OsmApiWriter::_changesetThreadFunc, this, i));
   }
-  _threadIdle.reserve(_maxWriters);
   //  Setup the progress indicators
   long total = _changeset.getTotalElementCount();
   float progress = 0.0f;
@@ -162,6 +166,13 @@ bool OsmApiWriter::apply()
     _workQueueMutex.lock();
     int queueSize = (int)_workQueue.size();
     _workQueueMutex.unlock();
+    //  If all threads have failed, fail the rest of the changeset and exit
+    if (_allThreadsFailed())
+    {
+      _changeset.failRemainingChangeset();
+      _threadsCanExit = true;
+      break;
+    }
     //  Only queue up enough work to keep all the threads busy with times QUEUE_SIZE_MULTIPLIER
     //  so that results can come back and update the changeset for more atomic changesets instead
     //  of a big list of nodes, then ways, then relations.  This will give us fuller more atomic
@@ -177,9 +188,8 @@ bool OsmApiWriter::apply()
       //  Add the new work to the queue if there is any
       if (newChangeset)
       {
-        _workQueueMutex.lock();
-        _workQueue.push(changeset_info);
-        _workQueueMutex.unlock();
+        //  Push the new changeset onto the work queue
+        _pushChangesets(changeset_info);
       }
       else if (queueSize == 0 && !newChangeset && _changeset.hasElementsToSend() && _threadsAreIdle())
       {
@@ -187,26 +197,39 @@ bool OsmApiWriter::apply()
         //  all of the threads are idle and not waiting for something to come back
         //  There are two things that can be done here, first is to put everything that is
         //  "ready to send" in a changeset and send it OR move everything to the error state
-
+/*
         //  Option #1: Get all of the remaining elements as a single changeset
         _changesetMutex.lock();
         _changeset.calculateRemainingChangeset(changeset_info);
         _changesetMutex.unlock();
         //  Push that changeset
-        _workQueueMutex.lock();
-        _workQueue.push(changeset_info);
-        _workQueueMutex.unlock();
+        _pushChangesets(changeset_info);
+        //  Let the threads know that the remaining changeset is the "remaining" changeset
+        _threadsCanExit = true;
+*/
+        LOG_STATUS("Apply Changeset: Remaining elements unsendable...");
+        //  Option #2: Move everything to the error state and exit
+        _changesetMutex.lock();
+        _changeset.failRemainingChangeset();
+        _changesetMutex.unlock();
+        //  Let the threads know that the remaining changeset has failed
+        _threadsCanExit = true;
+        break;
       }
       else
       {
+        //  Indicate to the worker threads that there is work to be done
+        _startWork();
         //  Allow time for the worker threads to complete some work
-        this_thread::sleep_for(chrono::milliseconds(10));
+        this_thread::yield();
       }
     }
     else
     {
+      //  Indicate to the worker threads that there is work to be done
+      _startWork();
       //  Allow time for the worker threads to complete some work
-      this_thread::sleep_for(chrono::milliseconds(10));
+      this_thread::yield();
     }
     //  Show the progress
     if (_showProgress)
@@ -220,10 +243,19 @@ bool OsmApiWriter::apply()
       }
     }
   }
+  //  Indicate to the worker threads that there is work to be done, if they haven't started already
+  _startWork();
   //  Wait for the threads to shutdown
   for (int i = 0; i < _maxWriters; ++i)
     _threadPool[i].join();
-  LOG_INFO("Upload progress: 100%");
+  //  Check for failed threads
+  if (_hasFailedThread())
+  {
+    LOG_ERROR("Multiple bad changeset ID errors in a row, is the API functioning correctly?");
+    _changeset.failRemainingChangeset();
+  }
+  //  Final write for the error file
+  _changeset.writeErrorFile();
   //  Keep some stats
   _stats.append(SingleStat("API Upload Time (sec)", timer.getElapsedAndRestart()));
   _stats.append(SingleStat("Total OSM Changesets Uploaded", _changesetCount));
@@ -241,14 +273,15 @@ bool OsmApiWriter::apply()
 void OsmApiWriter::_changesetThreadFunc(int index)
 {
   //  Set the status to working
-  _threadStatusMutex.lock();
-  _threadStatus[index] = ThreadStatus::Working;
-  _threadStatusMutex.unlock();
+  _updateThreadStatus(index, ThreadStatus::Working);
   //  Setup the network request object with OAuth or with username/password authentication
   HootNetworkRequestPtr request = createNetworkRequest(true);
   long id = -1;
   long changesetSize = 0;
   bool stop_thread = false;
+  int changeset_failures = 0;
+  //  Before working, wait for the signal
+  _waitForStart();
   //  Iterate until all elements are sent and updated
   while (!_changeset.isDone() && !stop_thread)
   {
@@ -266,9 +299,7 @@ void OsmApiWriter::_changesetThreadFunc(int index)
     if (workInfo)
     {
       //  Set the status to working
-      _threadStatusMutex.lock();
-      _threadStatus[index] = ThreadStatus::Working;
-      _threadStatusMutex.unlock();
+      _updateThreadStatus(index, ThreadStatus::Working);
       //  Create the changeset ID if required
       if (id < 1)
       {
@@ -278,16 +309,30 @@ void OsmApiWriter::_changesetThreadFunc(int index)
       //  An ID of less than 1 isn't valid, try to fix it
       if (id < 1)
       {
-        _workQueueMutex.lock();
-        _workQueue.push(workInfo);
-        _workQueueMutex.unlock();
-        //  Reset the network request object and sleep it off
-        request = createNetworkRequest(true);
-        LOG_WARN("Bad changeset ID. Resetting network request object.");
-        this_thread::sleep_for(chrono::milliseconds(100));
+        _pushChangesets(workInfo);
+        //  Multiple changeset failures
+        changeset_failures++;
+        if (changeset_failures >= 3)
+        {
+          //  Set the thread status to failed and report the error message in the main thread
+          _updateThreadStatus(index, ThreadStatus::Failed);
+          stop_thread = true;
+        }
+        else
+        {
+          //  Reset the network request object and sleep it off
+          request = createNetworkRequest(true);
+          LOG_DEBUG("Bad changeset ID. Resetting network request object.");
+          this_thread::yield();
+        }
         //  Try a new create changeset request
         continue;
       }
+      else
+      {
+        //  Reset the changeset failure count when one is successful
+        changeset_failures = 0;
+      }
       //  Make sure that the changeset is valid and isn't empty
       if (workInfo->size() < 1)
       {
@@ -339,7 +384,10 @@ void OsmApiWriter::_changesetThreadFunc(int index)
         {
           //  Fail the entire changeset
           _changeset.updateFailedChangeset(workInfo, true);
+          //  Let the threads know that the remaining changeset is the "remaining" changeset
+          _threadsCanExit = true;
           //  Looping should end the thread because all of the remaining elements have now been set to the failed state
+          stop_thread = true;
           continue;
         }
         //  Split the changeset on conflict errors
@@ -352,29 +400,21 @@ void OsmApiWriter::_changesetThreadFunc(int index)
               if ((int)workInfo->size() > _maxChangesetSize / 2)
               {
                 //  Split the changeset into half so that it is smaller and won't fail
-                ChangesetInfoPtr split = _changeset.splitChangeset(workInfo);
-                _workQueueMutex.lock();
-                _workQueue.push(workInfo);
-                _workQueue.push(split);
-                _workQueueMutex.unlock();
+                _splitChangeset(workInfo);
               }
               else
               {
                 //  The changeset was closed already so set the ID to -1 and reprocess
                 id = -1;
                 //  Push the changeset back on the queue
-                _workQueueMutex.lock();
-                _workQueue.push(workInfo);
-                _workQueueMutex.unlock();
+                _pushChangesets(workInfo);
               }
               //  Loop back around to work on the next changeset
               continue;
             }
             else if (_fixConflict(request, workInfo, info->response))
             {
-              _workQueueMutex.lock();
-              _workQueue.push(workInfo);
-              _workQueueMutex.unlock();
+              _pushChangesets(workInfo);
               //  Loop back around to work on the next changeset
               continue;
             }
@@ -394,9 +434,7 @@ void OsmApiWriter::_changesetThreadFunc(int index)
               //  Try to automatically resolve certain issues, like out of date version
               if (_resolveIssues(request, workInfo))
               {
-                _workQueueMutex.lock();
-                _workQueue.push(workInfo);
-                _workQueueMutex.unlock();
+                _pushChangesets(workInfo);
               }
               else
               {
@@ -413,11 +451,9 @@ void OsmApiWriter::_changesetThreadFunc(int index)
           {
             //  Splitting failed which means that the changeset only has one element in it,
             //  push it back on the queue and give the API a break
-            _workQueueMutex.lock();
-            _workQueue.push(workInfo);
-            _workQueueMutex.unlock();
+            _pushChangesets(workInfo);
             //  Sleep the thread
-            this_thread::sleep_for(chrono::milliseconds(10));
+            this_thread::sleep_for(chrono::milliseconds(100));
           }
           break;
         default:
@@ -425,14 +461,11 @@ void OsmApiWriter::_changesetThreadFunc(int index)
           LOG_ERROR("Changeset upload responded with HTTP status response: " << request->getHttpStatus());
           //  Fall through
         case HttpResponseCode::HTTP_METHOD_NOT_ALLOWED:
+        case HttpResponseCode::HTTP_UNAUTHORIZED:
           //  This shouldn't ever happen, push back on the queue, only process a certain amount of times
           workInfo->retry();
           if (workInfo->canRetry())
-          {
-            _workQueueMutex.lock();
-            _workQueue.push(workInfo);
-            _workQueueMutex.unlock();
-          }
+            _pushChangesets(workInfo);
           else
             _changeset.updateFailedChangeset(workInfo, true);
           break;
@@ -441,7 +474,12 @@ void OsmApiWriter::_changesetThreadFunc(int index)
     }
     else
     {
-      if (!_changeset.hasElementsToSend() && !_changeset.isDone() && queueSize == 0)
+      if (_threadsCanExit)
+      {
+        stop_thread = true;
+        _updateThreadStatus(index, ThreadStatus::Completed);
+      }
+      else if (!_changeset.isDone() && queueSize == 0)
       {
         //  This is a bad state where the producer thread says all elements are sent and
         //  waits for all threads to join but the changeset isn't "done".
@@ -459,40 +497,23 @@ void OsmApiWriter::_changesetThreadFunc(int index)
           id = -1;
         }
         _threadStatusMutex.unlock();
-        if (_threadsAreIdle())
-        {
-          //  In this case there are elements that have been sent and not reported back
-          //  BUT there are no threads that are waiting for them either.  Every thread
-          //  except the "first" worker thread will exit here.  The first worker thread
-          //  Tries to calculate the remaining changeset and push in on the queue.  It then
-          //  loops around and picks up the remaining changeset to process.  Next time around
-          //  calculateRemainingChangeset() fails and this thread is stopped.
-          if (index != 0)
-            stop_thread = true;
-          else if (_changeset.calculateRemainingChangeset(workInfo))
-          {
-            _workQueueMutex.lock();
-            _workQueue.push(workInfo);
-            _workQueueMutex.unlock();
-          }
-          else
-            stop_thread = true;
-        }
       }
       else
       {
         //  Set the status to idle
-        _threadStatusMutex.lock();
-        _threadStatus[index] = ThreadStatus::Idle;
-        _threadStatusMutex.unlock();
-        //  Sleep the thread
-        this_thread::sleep_for(chrono::milliseconds(10));
+        _updateThreadStatus(index, ThreadStatus::Idle);
+        //  Yield the thread
+        this_thread::yield();
       }
     }
   }
   //  Close the changeset if one is still open
   if (id != -1)
     _closeChangeset(request, id);
+  //  Update the thread to complete if it didn't fail
+  ThreadStatus status = _getThreadStatus(index);
+  if (status != ThreadStatus::Failed && status != ThreadStatus::Unknown)
+    _updateThreadStatus(index, ThreadStatus::Completed);
 }
 
 void OsmApiWriter::setConfiguration(const Settings& conf)
@@ -675,7 +696,9 @@ long OsmApiWriter::_createChangeset(HootNetworkRequestPtr request,
 
     QString responseXml = QString::fromUtf8(request->getResponseContent().data());
 
-    return responseXml.toLong();
+    //  Only return the parsed response from HTTP 200 OK
+    if (request->getHttpStatus() == HttpResponseCode::HTTP_OK)
+      return responseXml.toLong();
   }
   catch (const HootException& ex)
   {
@@ -956,12 +979,19 @@ bool OsmApiWriter::_splitChangeset(const ChangesetInfoPtr& workInfo, const QStri
   _changesetMutex.unlock();
   if (split->size() > 0)
   {
-    //  Push both of the changesets onto the queue
-    _workQueueMutex.lock();
-    _workQueue.push(split);
-    _workQueue.push(workInfo);
-    _workQueueMutex.unlock();
-    return true;
+    //  Fail the split changeset if the error flag is set
+    if (split->getError())
+    {
+      _changeset.failChangeset(split);
+      _pushChangesets(workInfo);
+      return true;
+    }
+    else
+    {
+      //  Push both of the changesets onto the queue
+      _pushChangesets(workInfo, split);
+      return true;
+    }
   }
   //  Nothing was split out, return false
   return false;
@@ -987,4 +1017,74 @@ int OsmApiWriter::_getNextApiId()
   return ++_apiId;
 }
 
+void OsmApiWriter::_pushChangesets(ChangesetInfoPtr changeset, ChangesetInfoPtr changeset2)
+{
+  std::lock_guard<std::mutex> lock(_workQueueMutex);
+  if (changeset)
+    _workQueue.push(changeset);
+  if (changeset2)
+    _workQueue.push(changeset2);
+}
+
+void OsmApiWriter::_startWork()
+{
+  std::lock_guard<std::mutex> lock(_startMutex);
+  _startFlag = true;
+  _start.notify_all();
+}
+
+void OsmApiWriter::_waitForStart()
+{
+  std::unique_lock<std::mutex> lock(_startMutex);
+  _start.wait(lock, [this]{ return _startFlag; });
+}
+
+OsmApiWriter::ThreadStatus OsmApiWriter::_getThreadStatus(int thread_index)
+{
+  //  Validate the index
+  if (thread_index < 0 || thread_index >= (int)_threadStatus.size())
+    return ThreadStatus::Unknown;
+  //  Lock the mutex and return the status
+  std::lock_guard<std::mutex> lock(_threadStatusMutex);
+  return _threadStatus[thread_index];
+}
+
+
+void OsmApiWriter::_updateThreadStatus(int thread_index, ThreadStatus status)
+{
+  //  Validate the index
+  if (thread_index < 0 || thread_index >= (int)_threadStatus.size())
+    return;
+  //  Lock the mutex and update the status
+  std::lock_guard<std::mutex> lock(_threadStatusMutex);
+  _threadStatus[thread_index] = status;
+}
+
+bool OsmApiWriter::_allThreadsFailed()
+{
+  std::lock_guard<std::mutex> lock(_threadStatusMutex);
+  for (size_t i = 0; i < _threadStatus.size(); ++i)
+  {
+    //  Short circuit the loop if a thread isn't in the failed state
+    if (_threadStatus[i] != ThreadStatus::Failed)
+      return false;
+  }
+  //  All threads are in the failed state
+  return true;
+}
+
+bool OsmApiWriter::_hasFailedThread()
+{
+  std::lock_guard<std::mutex> lock(_threadStatusMutex);
+  for (size_t i = 0; i < _threadStatus.size(); ++i)
+  {
+    //  Short circuit the loop if a thread is in the failed state
+    if (_threadStatus[i] == ThreadStatus::Failed)
+      return true;
+  }
+  //  All threads are in the non-failed state
+  return false;
+}
+
+
 }
Clone this wiki locally