-
Notifications
You must be signed in to change notification settings - Fork 74
v0.2.51..v0.2.52 changeset OsmApiWriter.cpp
Garret Voltz edited this page Jan 15, 2020
·
1 revision
diff --git a/hoot-core/src/main/cpp/hoot/core/io/OsmApiWriter.cpp b/hoot-core/src/main/cpp/hoot/core/io/OsmApiWriter.cpp
index d55d06b..254c2c6 100644
--- a/hoot-core/src/main/cpp/hoot/core/io/OsmApiWriter.cpp
+++ b/hoot-core/src/main/cpp/hoot/core/io/OsmApiWriter.cpp
@@ -22,7 +22,7 @@
* This will properly maintain the copyright information. DigitalGlobe
* copyrights will be updated automatically.
*
- * @copyright Copyright (C) 2018, 2019 DigitalGlobe (http://www.digitalglobe.com/)
+ * @copyright Copyright (C) 2018, 2019, 2020 DigitalGlobe (http://www.digitalglobe.com/)
*/
#include "OsmApiWriter.h"
@@ -48,15 +48,17 @@ using namespace Tgs;
namespace hoot
{
-const char* OsmApiWriter::API_PATH_CAPABILITIES = "/api/capabilities/";
-const char* OsmApiWriter::API_PATH_PERMISSIONS = "/api/0.6/permissions/";
-const char* OsmApiWriter::API_PATH_CREATE_CHANGESET = "/api/0.6/changeset/create/";
-const char* OsmApiWriter::API_PATH_CLOSE_CHANGESET = "/api/0.6/changeset/%1/close/";
-const char* OsmApiWriter::API_PATH_UPLOAD_CHANGESET = "/api/0.6/changeset/%1/upload/";
-const char* OsmApiWriter::API_PATH_GET_ELEMENT = "/api/0.6/%1/%2/";
+const char* OsmApiWriter::API_PATH_CAPABILITIES = "/api/capabilities";
+const char* OsmApiWriter::API_PATH_PERMISSIONS = "/api/0.6/permissions";
+const char* OsmApiWriter::API_PATH_CREATE_CHANGESET = "/api/0.6/changeset/create";
+const char* OsmApiWriter::API_PATH_CLOSE_CHANGESET = "/api/0.6/changeset/%1/close";
+const char* OsmApiWriter::API_PATH_UPLOAD_CHANGESET = "/api/0.6/changeset/%1/upload";
+const char* OsmApiWriter::API_PATH_GET_ELEMENT = "/api/0.6/%1/%2";
OsmApiWriter::OsmApiWriter(const QUrl &url, const QString &changeset)
: _description(ConfigOptions().getChangesetDescription()),
+ _source(ConfigOptions().getChangesetSource()),
+ _hashtags(ConfigOptions().getChangesetHashtags()),
_maxWriters(ConfigOptions().getChangesetApidbWritersMax()),
_maxPushSize(ConfigOptions().getChangesetApidbSizeMax()),
_maxChangesetSize(ConfigOptions().getChangesetMaxSize()),
@@ -77,6 +79,8 @@ OsmApiWriter::OsmApiWriter(const QUrl &url, const QString &changeset)
OsmApiWriter::OsmApiWriter(const QUrl& url, const QList<QString>& changesets)
: _changesets(changesets),
_description(ConfigOptions().getChangesetDescription()),
+ _source(ConfigOptions().getChangesetSource()),
+ _hashtags(ConfigOptions().getChangesetHashtags()),
_maxWriters(ConfigOptions().getChangesetApidbWritersMax()),
_maxPushSize(ConfigOptions().getChangesetApidbSizeMax()),
_maxChangesetSize(ConfigOptions().getChangesetMaxSize()),
@@ -136,15 +140,18 @@ bool OsmApiWriter::apply()
// Start the writer threads
LOG_INFO("Starting " << _maxWriters << " processing threads.");
for (int i = 0; i < _maxWriters; ++i)
- _threadPool.push_back(thread(&OsmApiWriter::_changesetThreadFunc, this));
+ {
+ _threadStatus.push_back(ThreadStatus::Working);
+ _threadPool.push_back(thread(&OsmApiWriter::_changesetThreadFunc, this, i));
+ }
// Setup the progress indicators
long total = _changeset.getTotalElementCount();
float progress = 0.0f;
float increment = 0.01f;
// Setup the increment
- if (total < 100000)
+ if (total < 10000)
increment = 0.1f;
- else if (total < 1000000)
+ else if (total < 100000)
increment = 0.05f;
// Iterate all changes until there are no more elements to send
while (_changeset.hasElementsToSend())
@@ -172,6 +179,22 @@ bool OsmApiWriter::apply()
_workQueue.push(changeset_info);
_workQueueMutex.unlock();
}
+ else if (queueSize == 0 && !newChangeset && _changeset.hasElementsToSend() && _threadsAreIdle())
+ {
+ // This is an error case, the queue is empty, there are still elements to send but
+ // all of the threads are idle and not waiting for something to come back
+ // There are two things that can be done here, first is to put everything that is
+ // "ready to send" in a changeset and send it OR move everything to the error state
+
+ // Option #1: Get all of the remaining elements as a single changeset
+ _changesetMutex.lock();
+ _changeset.calculateRemainingChangeset(changeset_info);
+ _changesetMutex.unlock();
+ // Push that changeset
+ _workQueueMutex.lock();
+ _workQueue.push(changeset_info);
+ _workQueueMutex.unlock();
+ }
else
{
// Allow time for the worker threads to complete some work
@@ -213,18 +236,24 @@ bool OsmApiWriter::apply()
return success;
}
-void OsmApiWriter::_changesetThreadFunc()
+void OsmApiWriter::_changesetThreadFunc(int index)
{
+ // Set the status to working
+ _threadStatusMutex.lock();
+ _threadStatus[index] = ThreadStatus::Working;
+ _threadStatusMutex.unlock();
// Setup the network request object with OAuth or with username/password authentication
HootNetworkRequestPtr request = createNetworkRequest(true);
long id = -1;
long changesetSize = 0;
+ bool stop_thread = false;
// Iterate until all elements are sent and updated
- while (!_changeset.isDone())
+ while (!_changeset.isDone() && !stop_thread)
{
ChangesetInfoPtr workInfo;
// Try to get something off of the work queue
_workQueueMutex.lock();
+ int queueSize = _workQueue.size();
if (!_workQueue.empty())
{
workInfo = _workQueue.front();
@@ -234,10 +263,14 @@ void OsmApiWriter::_changesetThreadFunc()
if (workInfo)
{
+ // Set the status to working
+ _threadStatusMutex.lock();
+ _threadStatus[index] = ThreadStatus::Working;
+ _threadStatusMutex.unlock();
// Create the changeset ID if required
if (id < 1)
{
- id = _createChangeset(request, _description);
+ id = _createChangeset(request, _description, _source, _hashtags);
changesetSize = 0;
}
// An ID of less than 1 isn't valid, try to fix it
@@ -282,14 +315,33 @@ void OsmApiWriter::_changesetThreadFunc()
}
else
{
- // Log the error
- LOG_ERROR("Error uploading changeset: " << id << "\t" << request->getErrorString());
+ // Log the error as a status message
+ LOG_STATUS("Error uploading changeset: " << id << " - " << request->getErrorString() << " (" << request->getHttpStatus() << ")");
+ // If this is the last changeset, error it all out and finish working
+ if (workInfo->getLast())
+ {
+ // Fail the entire changeset
+ _changeset.updateFailedChangeset(workInfo, true);
+ // Looping should end the thread because all of the remaining elements have now been set to the failed state
+ continue;
+ }
// Split the changeset on conflict errors
switch (info->status)
{
case 409: // Conflict, check for version conflicts and fix, or split and continue
{
- if (_fixConflict(request, workInfo, info->response))
+ if (_changesetClosed(info->response))
+ {
+ // The changeset was closed already so set the ID to -1 and reprocess
+ id = -1;
+ // Push the changeset back on the queue
+ _workQueueMutex.lock();
+ _workQueue.push(workInfo);
+ _workQueueMutex.unlock();
+ // Loop back around to work on the next changeset
+ continue;
+ }
+ else if (_fixConflict(request, workInfo, info->response))
{
_workQueueMutex.lock();
_workQueue.push(workInfo);
@@ -304,39 +356,41 @@ void OsmApiWriter::_changesetThreadFunc()
case 400: // Placeholder ID is missing or not unique
case 404: // Diff contains elements where the given ID could not be found
case 412: // Precondition Failed, Relation with id cannot be saved due to other member
+ if (!_splitChangeset(workInfo, info->response))
{
- _changesetMutex.lock();
- ChangesetInfoPtr split = _changeset.splitChangeset(workInfo, info->response);
- _changesetMutex.unlock();
- if (split->size() > 0)
- {
- _workQueueMutex.lock();
- _workQueue.push(split);
- _workQueue.push(workInfo);
- _workQueueMutex.unlock();
- }
- else
+ if (!workInfo->getAttemptedResolveChangesetIssues())
{
- if (!workInfo->getAttemptedResolveChangesetIssues())
+ // Set the attempt issues resolved flag
+ workInfo->setAttemptedResolveChangesetIssues(true);
+ // Try to automatically resolve certain issues, like out of date version
+ if (_resolveIssues(request, workInfo))
{
- // Set the attempt issues resolved flag
- workInfo->setAttemptedResolveChangesetIssues(true);
- // Try to automatically resolve certain issues, like out of date version
- if (_resolveIssues(request, workInfo))
- {
- _workQueueMutex.lock();
- _workQueue.push(workInfo);
- _workQueueMutex.unlock();
- }
- else
- {
- // Set the element in the changeset to failed because the issues couldn't be resolved
- _changeset.updateFailedChangeset(workInfo);
- }
+ _workQueueMutex.lock();
+ _workQueue.push(workInfo);
+ _workQueueMutex.unlock();
+ }
+ else
+ {
+ // Set the element in the changeset to failed because the issues couldn't be resolved
+ _changeset.updateFailedChangeset(workInfo);
}
}
}
break;
+ case 500: // Internal Server Error, could be caused by the database being saturated
+ case 502: // Bad Gateway, there are issues with the gateway, split and retry
+ case 504: // Gateway Timeout, server is taking too long, split and retry
+ if (!_splitChangeset(workInfo, info->response))
+ {
+ // Splitting failed which means that the changeset only has one element in it,
+ // push it back on the queue and give the API a break
+ _workQueueMutex.lock();
+ _workQueue.push(workInfo);
+ _workQueueMutex.unlock();
+ // Sleep the thread
+ this_thread::sleep_for(chrono::milliseconds(10));
+ }
+ break;
default:
// This is a big problem, report it and try again
LOG_ERROR("Changeset upload responded with HTTP status response: " << request->getHttpStatus());
@@ -356,7 +410,23 @@ void OsmApiWriter::_changesetThreadFunc()
}
}
else
- this_thread::sleep_for(chrono::milliseconds(10));
+ {
+ if (!_changeset.hasElementsToSend() && queueSize == 0 && _threadsAreIdle())
+ {
+ // In this case there are elements that have been sent and not reported back
+ // BUT there are no threads that are waiting for them either
+ stop_thread = true;
+ }
+ else
+ {
+ // Set the status to idle
+ _threadStatusMutex.lock();
+ _threadStatus[index] = ThreadStatus::Idle;
+ _threadStatusMutex.unlock();
+ // Sleep the thread
+ this_thread::sleep_for(chrono::milliseconds(10));
+ }
+ }
}
// Close the changeset if one is still open
if (id != -1)
@@ -367,6 +437,8 @@ void OsmApiWriter::setConfiguration(const Settings& conf)
{
ConfigOptions options(conf);
_description = options.getChangesetDescription();
+ _source = options.getChangesetSource();
+ _hashtags = options.getChangesetHashtags();
_maxPushSize = options.getChangesetApidbSizeMax();
_maxChangesetSize = options.getChangesetMaxSize();
_maxWriters = options.getChangesetApidbWritersMax();
@@ -508,7 +580,10 @@ bool OsmApiWriter::_parsePermissions(const QString& permissions)
}
// https://wiki.openstreetmap.org/wiki/API_v0.6#Create:_PUT_.2Fapi.2F0.6.2Fchangeset.2Fcreate
-long OsmApiWriter::_createChangeset(HootNetworkRequestPtr request, const QString& description)
+long OsmApiWriter::_createChangeset(HootNetworkRequestPtr request,
+ const QString& description,
+ const QString& source,
+ const QString& hashtags)
{
try
{
@@ -519,8 +594,11 @@ long OsmApiWriter::_createChangeset(HootNetworkRequestPtr request, const QString
" <changeset>"
" <tag k='created_by' v='%1'/>"
" <tag k='comment' v='%2'/>"
+ " <tag k='source' v='%3'/>"
+ " <tag k='hashtags' v='%4'/>"
+ " <tag k='bot' v='yes'/>"
" </changeset>"
- "</osm>").arg(HOOT_NAME).arg(description);
+ "</osm>").arg(HOOT_NAME).arg(description).arg(source).arg(hashtags);
request->networkRequest(changeset, QNetworkAccessManager::Operation::PutOperation, xml.toUtf8());
@@ -616,7 +694,7 @@ OsmApiWriter::OsmApiFailureInfoPtr OsmApiWriter::_uploadChangeset(HootNetworkReq
info->success = true;
break;
case 400:
- LOG_WARN("Changeset Upload Error: Error parsing XML changeset\n" << info->response);
+ LOG_WARN("Changeset Upload Error: Error parsing XML changeset - " << info->response);
break;
case 404:
LOG_WARN("Unknown changeset or elements don't exist");
@@ -669,6 +747,11 @@ bool OsmApiWriter::_fixConflict(HootNetworkRequestPtr request, ChangesetInfoPtr
return success;
}
+bool OsmApiWriter::_changesetClosed(const QString &conflictExplanation)
+{
+ return _changeset.matchesChangesetClosedFailure(conflictExplanation);
+}
+
bool OsmApiWriter::_resolveIssues(HootNetworkRequestPtr request, ChangesetInfoPtr changeset)
{
bool success = false;
@@ -773,4 +856,43 @@ HootNetworkRequestPtr OsmApiWriter::createNetworkRequest(bool requiresAuthentica
return request;
}
+bool OsmApiWriter::_threadsAreIdle()
+{
+ bool response = true;
+ // Lock the thread status mutex only once
+ _threadStatusMutex.lock();
+ for (vector<ThreadStatus>::iterator it = _threadStatus.begin(); it != _threadStatus.end(); ++it)
+ {
+ // It only takes one thread working to return false
+ if (*it == ThreadStatus::Working)
+ {
+ response = false;
+ break;
+ }
+ }
+ // Unlock the thread status mutex
+ _threadStatusMutex.unlock();
+ return response;
+}
+
+bool OsmApiWriter::_splitChangeset(const ChangesetInfoPtr& workInfo, const QString& response)
+{
+ // Try to split the changeset in half
+ _changesetMutex.lock();
+ ChangesetInfoPtr split = _changeset.splitChangeset(workInfo, response);
+ _changesetMutex.unlock();
+ if (split->size() > 0)
+ {
+ // Push both of the changesets onto the queue
+ _workQueueMutex.lock();
+ _workQueue.push(split);
+ _workQueue.push(workInfo);
+ _workQueueMutex.unlock();
+ return true;
+ }
+ // Nothing was split out, return false
+ return false;
+}
+
+
}