v0.2.47..v0.2.48 changeset ChangesetCreator.cpp
Garret Voltz edited this page Sep 27, 2019
·
1 revision
diff --git a/hoot-core/src/main/cpp/hoot/core/algorithms/changeset/ChangesetCreator.cpp b/hoot-core/src/main/cpp/hoot/core/algorithms/changeset/ChangesetCreator.cpp
new file mode 100644
index 0000000..aecc0d4
--- /dev/null
+++ b/hoot-core/src/main/cpp/hoot/core/algorithms/changeset/ChangesetCreator.cpp
@@ -0,0 +1,642 @@
+/*
+ * This file is part of Hootenanny.
+ *
+ * Hootenanny is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * --------------------------------------------------------------------
+ *
+ * The following copyright notices are generated automatically. If you
+ * have a new notice to add, please use the format:
+ * " * @copyright Copyright ..."
+ * This will properly maintain the copyright information. DigitalGlobe
+ * copyrights will be updated automatically.
+ *
+ * @copyright Copyright (C) 2019 DigitalGlobe (http://www.digitalglobe.com/)
+ */
+#include "ChangesetCreator.h"
+
+// Hoot
+#include <hoot/core/criterion/NotCriterion.h>
+#include <hoot/core/criterion/TagKeyCriterion.h>
+#include <hoot/core/elements/ExternalMergeElementSorter.h>
+#include <hoot/core/elements/InMemoryElementSorter.h>
+#include <hoot/core/io/ElementCriterionVisitorInputStream.h>
+#include <hoot/core/io/ElementStreamer.h>
+#include <hoot/core/io/OsmApiDbSqlChangesetFileWriter.h>
+#include <hoot/core/io/OsmMapReaderFactory.h>
+#include <hoot/core/io/OsmMapWriterFactory.h>
+#include <hoot/core/io/OsmPbfReader.h>
+#include <hoot/core/io/OsmXmlChangesetFileWriter.h>
+#include <hoot/core/io/PartialOsmMapReader.h>
+#include <hoot/core/ops/NamedOp.h>
+#include <hoot/core/util/ConfigOptions.h>
+#include <hoot/core/util/Factory.h>
+#include <hoot/core/util/GeometryUtils.h>
+#include <hoot/core/util/IoUtils.h>
+#include <hoot/core/util/MapProjector.h>
+#include <hoot/core/util/Progress.h>
+#include <hoot/core/visitors/ApiTagTruncateVisitor.h>
+#include <hoot/core/visitors/RemoveElementsVisitor.h>
+#include <hoot/core/visitors/RemoveUnknownVisitor.h>
+#include <hoot/core/util/ConfigUtils.h>
+
+//GEOS
+#include <geos/geom/Envelope.h>
+
+namespace hoot
+{
+
+const QString ChangesetCreator::JOB_SOURCE = "Derive Changeset";
+
+ChangesetCreator::ChangesetCreator(const bool printStats, const QString osmApiDbUrl) :
+_osmApiDbUrl(osmApiDbUrl),
+_numTotalTasks(0),
+_currentTaskNum(0),
+_printStats(printStats),
+_singleInput(false)
+{
+}
+
+void ChangesetCreator::create(const QString& output, const QString& input1, const QString& input2)
+{
+ if (!_isSupportedOutputFormat(output))
+ {
+ throw HootException("Unsupported changeset output format: " + output);
+ }
+ else if (output.endsWith(".osc.sql") && _osmApiDbUrl.isEmpty())
+ {
+ throw IllegalArgumentException(
+ "Output to SQL changeset requires an OSM API database URL be specified.");
+ }
+
+ LOG_DEBUG(
+ "Creating changeset from inputs: " << input1 << " and " << input2 << " to output: " <<
+ output << "...");
+
+ _singleInput = input2.trimmed().isEmpty();
+ LOG_VARD(_singleInput);
+ // both inputs must support streaming to use streaming I/O
+ const bool useStreamingIo =
+ _inputIsStreamable(input1) && (_singleInput || _inputIsStreamable(input2));
+ LOG_VARD(useStreamingIo);
+
+ // The number of steps here must be updated as you add/remove job steps in the logic.
+ _numTotalTasks = 2;
+ // for non-streamable convert ops and other inline ops that occur when not streaming
+ if (!useStreamingIo)
+ {
+ // For non-streaming I/O we can divide each data conversion task into a separate step, hence
+ // the larger number of steps. With streaming I/O that isn't possible since all the data
+ // conversion operations are executed inline at the same time the data is read in.
+ _numTotalTasks += 3;
+ if (ConfigOptions().getConvertOps().size() > 0)
+ {
+ // Convert ops get a single task, which NamedOp will break down into sub-tasks during
+ // progress reporting.
+ _numTotalTasks++;
+ if (!ElementStreamer::areValidStreamingOps(ConfigOptions().getConvertOps()))
+ {
+ // Have the extra work of combining and separating data inputs when any of the convert
+ // ops aren't streamable.
+ _numTotalTasks++;
+ }
+ }
+ }
+
+ _currentTaskNum = 1;
+ Progress progress(ConfigOptions().getJobId(), JOB_SOURCE, Progress::JobState::Running);
+ const int maxFilePrintLength = ConfigOptions().getProgressVarPrintLengthMax();
+
+ progress.set(
+ 0.0,
+ "Deriving output changeset: ..." + output.right(maxFilePrintLength) + " from inputs: ..." +
+ input1.right(maxFilePrintLength) + " and ..." + input2.right(maxFilePrintLength) + "...");
+
+ //sortedElements1 is the former state of the data
+ ElementInputStreamPtr sortedElements1;
+ //sortedElements2 is the newer state of the data
+ ElementInputStreamPtr sortedElements2;
+
+ // If we have two inputs, we'll determine the difference between them as the changeset.
+ // Otherwise, we're passing all the input data through to the output changeset, so put it in
+ // the sortedElements2 newer data and leave the first one empty. The result will be a changeset
+ // made up completely of what is in the single input.
+
+ if (!useStreamingIo)
+ {
+ // In the case that not all input formats or convert ops are streamable or the
+ // user chose to force in memory streaming by not specifying a sort buffer size, let's
+ // use memory bound sorting.
+
+ // read both inputs completely
+ OsmMapPtr map1(new OsmMap());
+ OsmMapPtr map2(new OsmMap());
+ _readInputsFully(input1, input2, map1, map2, progress);
+
+ // TODO: There need to be checks here to only sort if the input isn't already sorted like
+ // there are for the external sorting (e.g. pre-sorted PBF file).
+
+ progress.set(
+ (float)(_currentTaskNum - 1) / (float)_numTotalTasks,
+ "Sorting input elements; task #" + QString::number(_currentTaskNum) + "...");
+ if (!_singleInput)
+ {
+ sortedElements1 = _sortElementsInMemory(map1);
+ assert(map2.get());
+ sortedElements2 = _sortElementsInMemory(map2);
+ }
+ else
+ {
+ sortedElements1 = _getEmptyInputStream();
+ sortedElements2 = _sortElementsInMemory(map1);
+ }
+ _currentTaskNum++;
+ }
+ else
+ {
+ // If external sorting is enabled and the input and convert ops are streamable, externally
+ // sort the elements to avoid potential memory issues.
+
+ if (!_singleInput)
+ {
+ sortedElements1 = _getExternallySortedElements(input1, progress);
+ sortedElements2 = _getExternallySortedElements(input2, progress);
+ }
+ else
+ {
+
+ sortedElements1 = _getEmptyInputStream();
+ sortedElements2 = _getExternallySortedElements(input1, progress);
+ }
+ _currentTaskNum++;
+ }
+
+ // write out the changeset file
+ assert(sortedElements1.get() && sortedElements2.get());
+ progress.set((float)(_currentTaskNum - 1) / (float)_numTotalTasks, "Writing changeset...");
+ _streamChangesetOutput(sortedElements1, sortedElements2, output);
+ _currentTaskNum++;
+
+ progress.set(
+ 1.0, Progress::JobState::Successful,
+ "Changeset written to: ..." + output.right(maxFilePrintLength));
+}
+
+void ChangesetCreator::create(OsmMapPtr& map1, OsmMapPtr& map2, const QString& output)
+{
+ LOG_DEBUG(
+ "Creating changeset from inputs: " << map1->getName() << " and " << map2->getName() <<
+ " to output: " << output << "...");
+ OsmMapWriterFactory::writeDebugMap(map1, "map1-before-changeset-derivation");
+ OsmMapWriterFactory::writeDebugMap(map2, "map2-before-changeset-derivation");
+
+ // don't want to include review relations - may need to remove this depending on what happens
+ // with #3361
+ std::shared_ptr<TagKeyCriterion> elementCriterion(
+ new TagKeyCriterion(MetadataTags::HootReviewNeeds()));
+ RemoveElementsVisitor removeElementsVisitor;
+ removeElementsVisitor.setRecursive(false);
+ removeElementsVisitor.addCriterion(elementCriterion);
+ map1->visitRw(removeElementsVisitor);
+ map2->visitRw(removeElementsVisitor);
+
+ // Truncate tags over 255 characters to push into OSM API.
+ ApiTagTruncateVisitor truncateTags;
+ map1->visitRw(truncateTags);
+ map2->visitRw(truncateTags);
+
+ LOG_VARD(MapProjector::toWkt(map1->getProjection()));
+ LOG_VARD(MapProjector::toWkt(map2->getProjection()));
+
+ //sortedElements1 is the former state of the data
+ ElementInputStreamPtr sortedElements1;
+ //sortedElements2 is the newer state of the data
+ ElementInputStreamPtr sortedElements2;
+
+ // no need to implement application of ops for this logic path
+
+ if (map2)
+ {
+ sortedElements1 = _sortElementsInMemory(map1);
+ sortedElements2 = _sortElementsInMemory(map2);
+ }
+ else
+ {
+ sortedElements1 = _getEmptyInputStream();
+ sortedElements2 = _sortElementsInMemory(map1);
+ }
+
+ // write out the changeset file
+ assert(sortedElements1.get() && sortedElements2.get());;
+ _streamChangesetOutput(sortedElements1, sortedElements2, output);
+}
+
+bool ChangesetCreator::_isSupportedOutputFormat(const QString& format) const
+{
+ return format.endsWith(".osc") || format.endsWith(".osc.sql");
+}
+
+bool ChangesetCreator::_inputIsSorted(const QString& input) const
+{
+ // ops could change the ordering
+ if (ConfigOptions().getConvertOps().size() > 0)
+ {
+ return false;
+ }
+
+ //Streaming db inputs actually do not come back sorted, despite the order by id clause
+ //in the query (see ApiDb::selectElements). Otherwise, we'd skip sorting them too.
+
+ //pbf sets a sort flag
+ if (OsmPbfReader().isSupported(input) && OsmPbfReader().isSorted(input))
+ {
+ return true;
+ }
+ return false;
+}
+
+bool ChangesetCreator::_inputIsStreamable(const QString& input) const
+{
+ LOG_VARD(OsmMapReaderFactory::hasElementInputStream(input));
+ LOG_VARD(ElementStreamer::areValidStreamingOps(ConfigOptions().getConvertOps()));
+ LOG_VARD(ConfigOptions().getElementSorterElementBufferSize());
+ return
+ // The input format itself must be streamable (partially read).
+ OsmMapReaderFactory::hasElementInputStream(input) &&
+ // All ops must be streamable, otherwise we'll load both inputs into memory.
+ ElementStreamer::areValidStreamingOps(ConfigOptions().getConvertOps()) &&
+ // If no sort buffer size is set, we sort in-memory. If we're already loading the data
+ // into memory for sorting, might as well force it into memory for the initial read as well.
+ ConfigOptions().getElementSorterElementBufferSize() != -1;
+}
+
+void ChangesetCreator::_handleUnstreamableConvertOpsInMemory(const QString& input1,
+ const QString& input2,
+ OsmMapPtr& map1, OsmMapPtr& map2,
+ Progress progress)
+{
+ LOG_DEBUG("Handling unstreamable convert ops in memory...");
+
+ progress.set(
+ (float)(_currentTaskNum - 1) / (float)_numTotalTasks, "Reading entire input ...");
+ OsmMapPtr fullMap(new OsmMap());
+ if (!_singleInput)
+ {
+ // We must preserve the original element IDs while loading in order for changeset derivation
+ // to work.
+
+ // Load the first map. If we have a bounded query, let's check for the crop related option
+ // overrides.
+ IoUtils::loadMap(fullMap, input1, true, Status::Unknown1);
+ OsmMapWriterFactory::writeDebugMap(fullMap, "after-initial-read-unstreamable-ref-map");
+
+ // append the second map onto the first one
+
+ OsmMapPtr tmpMap(new OsmMap());
+ IoUtils::loadMap(tmpMap, input2, true, Status::Unknown2);
+ OsmMapWriterFactory::writeDebugMap(tmpMap, "after-initial-read-unstreamable-sec-map");
+
+ try
+ {
+ fullMap->append(tmpMap);
+ }
+ catch (const HootException& e)
+ {
+ // If there were any element IDs in common between the two input files, we'll get this error.
+ // In that case we must fail.
+ if (e.getWhat().contains("already contains"))
+ {
+ throw HootException(
+ QString("It is not possible to run a non-streamable map operation ") +
+ QString("(OsmMapOperation) on two data sources with overlapping element IDs: ") +
+ e.what());
+ }
+ throw e;
+ }
+ }
+ else
+ {
+ // Just load the first map, but as unknown2 to end up with a changeset made up of just this
+ // input.
+ IoUtils::loadMap(fullMap, input1, true, Status::Unknown2);
+ }
+ LOG_VARD(fullMap->getElementCount());
+ OsmMapWriterFactory::writeDebugMap(fullMap, "after-initial-read-unstreamable-full-map");
+ _currentTaskNum++;
+
+ // Apply our convert ops to the entire map. If any of these are map consumers (OsmMapOperation)
+ // then they some will exhibit undefined behavior if you try to exec them on the inputs
+ // separately.
+ LOG_DEBUG("Applying convert ops...");
+ NamedOp convertOps(ConfigOptions().getConvertOps());
+ convertOps.setProgress(
+ Progress(
+ ConfigOptions().getJobId(), JOB_SOURCE, Progress::JobState::Running,
+ (float)(_currentTaskNum - 1) / (float)_numTotalTasks, 1.0 / (float)_numTotalTasks));
+ convertOps.apply(fullMap);
+ // get back into wgs84 in case some op changed the proj
+ MapProjector::projectToWgs84(fullMap);
+ _currentTaskNum++;
+
+ // We need the two inputs separated for changeset derivation, so split them back out by status.
+ LOG_DEBUG("Separating maps by status...");
+ progress.set(
+ (float)(_currentTaskNum - 1) / (float)_numTotalTasks, "Separating out input maps...");
+ RemoveUnknown1Visitor remove1Vis;
+ RemoveUnknown2Visitor remove2Vis;
+ map1.reset(new OsmMap(fullMap));
+ if (!_singleInput)
+ {
+ map1->visitRw(remove2Vis);
+
+ map2.reset(new OsmMap(fullMap));
+ map2->visitRw(remove1Vis);
+ }
+ else
+ {
+ map1->visitRw(remove1Vis);
+ }
+ LOG_VARD(map1->getElementCount());
+ OsmMapWriterFactory::writeDebugMap(map1, "unstreamable-separated-map-1");
+ LOG_VARD(map2->getElementCount());
+ OsmMapWriterFactory::writeDebugMap(map2, "unstreamable-separated-map-2");
+ _currentTaskNum++;
+}
+
+void ChangesetCreator::_handleStreamableConvertOpsInMemory(const QString& input1,
+ const QString& input2, OsmMapPtr& map1,
+ OsmMapPtr& map2, Progress progress)
+{
+ LOG_DEBUG("Handling streamable convert ops in memory...");
+
+ // Preserving source IDs is important here.
+
+ // There's no need to check for the crop related config opts here, as we do in
+ // _handleUnstreamableConvertOpsInMemory, as a bounded query will always prevent us from
+ // streaming.
+
+ progress.set(
+ (float)(_currentTaskNum - 1) / (float)_numTotalTasks, "Reading entire input ...");
+ if (!_singleInput)
+ {
+ // Load each input into a separate map. There's no need to combine them, since we have all
+ // streamable convert ops (avoids the extra cost of splitting them back apart).
+ IoUtils::loadMap(map1, input1, true, Status::Unknown1);
+ IoUtils::loadMap(map2, input2, true, Status::Unknown2);
+ }
+ else
+ {
+ // Just load the first map, but as unknown2 to end up with a changeset made up of just this
+ // input.
+ IoUtils::loadMap(map1, input1, true, Status::Unknown2);
+ }
+ OsmMapWriterFactory::writeDebugMap(map1, "after-initial-read-streamable-map-1");
+ OsmMapWriterFactory::writeDebugMap(map2, "after-initial-read-streamable-map-2");
+ _currentTaskNum++;
+
+ // Apply our convert ops to each map separately.
+ LOG_DEBUG("Applying convert ops...");
+ NamedOp convertOps(ConfigOptions().getConvertOps());
+ convertOps.setProgress(
+ Progress(
+ ConfigOptions().getJobId(), JOB_SOURCE, Progress::JobState::Running,
+ (float)(_currentTaskNum - 1) / (float)_numTotalTasks, 1.0 / (float)_numTotalTasks));
+ convertOps.apply(map1);
+ MapProjector::projectToWgs84(map1);
+ if (!_singleInput)
+ {
+ convertOps.apply(map2);
+ MapProjector::projectToWgs84(map2);
+ }
+ _currentTaskNum++;
+}
+
+void ChangesetCreator::_readInputsFully(const QString& input1, const QString& input2,
+ OsmMapPtr& map1, OsmMapPtr& map2, Progress progress)
+{
+ LOG_VARD(ConfigOptions().getConvertOps().size());
+ if (ConfigOptions().getConvertOps().size() > 0)
+ {
+ if (!ElementStreamer::areValidStreamingOps(ConfigOptions().getConvertOps()))
+ {
+ /*
+ * If any op in the convert ops is a map consumer, then it must go through this logic, which
+ * requires combining both map inputs into one. If there are any overlapping element IDs
+ * between the two datasets, an error will occur.
+ *
+ * It is possible there could be situations where we may be ok with applying a non-streamable
+ * op separately to each map, in which case we overlapping element IDs wouldn't be an issue.
+ * If that situation arises, we will need to refactor here.
+ */
+ _handleUnstreamableConvertOpsInMemory(input1, input2, map1, map2, progress);
+ }
+ else
+ {
+ /*
+ * If none of the ops are map consumers, we can avoid having to load both inputs into the same
+ * map, which gets around the ID overlap problem.
+ */
+ _handleStreamableConvertOpsInMemory(input1, input2, map1, map2, progress);
+ }
+ }
+ else
+ {
+ LOG_DEBUG("Processing inputs without convert ops...");
+
+ // We didn't have any convert ops, so just load everything up.
+ progress.set(
+ (float)(_currentTaskNum - 1) / (float)_numTotalTasks, "Reading entire input...");
+ // Preserving source IDs is important here.
+ if (!_singleInput)
+ {
+ // Load each input into a separate map; see related comments in
+ // _handleUnstreamableConvertOpsInMemory
+ IoUtils::loadMap(map1, input1, true, Status::Unknown1);
+ IoUtils::loadMap(map2, input2, true, Status::Unknown2);
+ }
+ else
+ {
+ // Just load the first map, but as unknown2 to end up with a changeset made up of just this
+ // input.
+ IoUtils::loadMap(map1, input1, true, Status::Unknown2);
+ }
+ OsmMapWriterFactory::writeDebugMap(map1, "after-initial-read-no-ops-map-1");
+ OsmMapWriterFactory::writeDebugMap(map2, "after-initial-read-no-ops-map-2");
+ _currentTaskNum++;
+ }
+
+ // We don't want to include review relations.
+ progress.set(
+ (float)(_currentTaskNum - 1) / (float)_numTotalTasks, "Removing review relations...");
+ std::shared_ptr<TagKeyCriterion> elementCriterion(
+ new TagKeyCriterion(MetadataTags::HootReviewNeeds()));
+ RemoveElementsVisitor removeElementsVisitor;
+ removeElementsVisitor.setRecursive(false);
+ removeElementsVisitor.addCriterion(elementCriterion);
+ map1->visitRw(removeElementsVisitor);
+ if (!_singleInput)
+ {
+ map2->visitRw(removeElementsVisitor);
+ }
+ OsmMapWriterFactory::writeDebugMap(map1, "after-remove-reviews-map-1");
+ OsmMapWriterFactory::writeDebugMap(map2, "after-remove-reviews-map-2");
+ _currentTaskNum++;
+
+ // Truncate tags over 255 characters to push into OSM API.
+ progress.set(
+ (float)(_currentTaskNum - 1) / (float)_numTotalTasks, "Preparing tags for changeset...");
+ ApiTagTruncateVisitor truncateTags;
+ map1->visitRw(truncateTags);
+ if (!_singleInput)
+ {
+ map2->visitRw(truncateTags);
+ }
+ OsmMapWriterFactory::writeDebugMap(map1, "after-truncate-tags-map-1");
+ OsmMapWriterFactory::writeDebugMap(map2, "after-truncate-tags-map-2");
+ _currentTaskNum++;
+}
+
+ElementInputStreamPtr ChangesetCreator::_getExternallySortedElements(const QString& input,
+ Progress progress)
+{
+ progress.set(
+ (float)(_currentTaskNum - 1) / (float)_numTotalTasks,
+ "Sorting input elements ..." + input.right(25) + "...");
+
+ ElementInputStreamPtr sortedElements;
+
+ //Some in these datasets may have status=3 if you're loading conflated data, so use
+ //reader.use.file.status and reader.keep.status.tag if you want to retain that value.
+
+ // Only sort if input isn't already sorted.
+ if (!_inputIsSorted(input))
+ {
+ sortedElements = _sortElementsExternally(input);
+ }
+ else
+ {
+ // If it was sorted, just get a stream with the ops we need to be applied inline and don't do
+ // any sorting.
+ sortedElements = _getFilteredInputStream(input);
+ }
+ _currentTaskNum++;
+
+ return sortedElements;
+}
+
+ElementInputStreamPtr ChangesetCreator::_getEmptyInputStream()
+{
+ // a no-op here since InMemoryElementSorter taking in an empty map will just return an empty
+ // element stream
+ return InMemoryElementSorterPtr(new InMemoryElementSorter(OsmMapPtr(new OsmMap())));
+}
+
+ElementInputStreamPtr ChangesetCreator::_getFilteredInputStream(const QString& input)
+{
+ LOG_DEBUG("Retrieving filtered input stream for: " << input.right(25) << "...");
+
+ QList<ElementVisitorPtr> visitors;
+ // We don't want to include review relations.
+ std::shared_ptr<ElementCriterion> elementCriterion(
+ new NotCriterion(
+ std::shared_ptr<TagKeyCriterion>(
+ new TagKeyCriterion(MetadataTags::HootReviewNeeds()))));
+ // Tags need to be truncated if they are over 255 characters.
+ visitors.append(std::shared_ptr<ApiTagTruncateVisitor>(new ApiTagTruncateVisitor()));
+
+ // open a stream to the input data
+ std::shared_ptr<PartialOsmMapReader> reader =
+ std::dynamic_pointer_cast<PartialOsmMapReader>(
+ OsmMapReaderFactory::createReader(input));
+ reader->setUseDataSourceIds(true);
+ reader->open(input);
+ ElementInputStreamPtr inputStream = std::dynamic_pointer_cast<ElementInputStream>(reader);
+ ElementInputStreamPtr filteredInputStream(
+ new ElementCriterionVisitorInputStream(inputStream, elementCriterion, visitors));
+
+ // Add convert ops supporting streaming into the pipeline, if there are any. TODO: Any
+ // OsmMapOperations in the bunch need to operate on the entire map made up of both inputs to
+ // work correctly.
+ return
+ ElementStreamer::getFilteredInputStream(filteredInputStream, ConfigOptions().getConvertOps());
+}
+
+ElementInputStreamPtr ChangesetCreator::_sortElementsInMemory(OsmMapPtr map)
+{
+ return InMemoryElementSorterPtr(new InMemoryElementSorter(map));
+}
+
+ElementInputStreamPtr ChangesetCreator::_sortElementsExternally(const QString& input)
+{
+ std::shared_ptr<ExternalMergeElementSorter> sorted(new ExternalMergeElementSorter());
+ sorted->sort(_getFilteredInputStream(input));
+ return sorted;
+}
+
+void ChangesetCreator::_streamChangesetOutput(ElementInputStreamPtr input1,
+ ElementInputStreamPtr input2, const QString& output)
+{
+ LOG_INFO("Streaming changeset output to " << output.right(25) << "...")
+
+ QString stats;
+ LOG_VARD(output);
+
+ // Could this eventually be cleaned up to use OsmChangeWriterFactory and the OsmChange interface
+ // instead?
+ _changesetDeriver.reset(new ChangesetDeriver(input1, input2));
+ if (output.endsWith(".osc"))
+ {
+ OsmXmlChangesetFileWriter writer;
+ writer.write(output, _changesetDeriver);
+ stats = writer.getStatsTable();
+ }
+ else if (output.endsWith(".osc.sql"))
+ {
+ assert(!_osmApiDbUrl.isEmpty());
+ OsmApiDbSqlChangesetFileWriter(QUrl(_osmApiDbUrl)).write(output, _changesetDeriver);
+ }
+
+ LOG_VARD(_changesetDeriver->getNumCreateChanges());
+ LOG_VARD(_changesetDeriver->getNumModifyChanges());
+ LOG_VARD(_changesetDeriver->getNumDeleteChanges());
+ LOG_VARD(_changesetDeriver->getNumFromElementsParsed());
+ LOG_VARD(_changesetDeriver->getNumToElementsParsed());
+ if (_changesetDeriver->getNumChanges() == 0)
+ {
+ LOG_WARN("No changes written to changeset.");
+ }
+
+ // close the output stream
+ std::shared_ptr<PartialOsmMapReader> partialReader1 =
+ std::dynamic_pointer_cast<PartialOsmMapReader>(input1);
+ if (partialReader1)
+ {
+ partialReader1->finalizePartial();
+ }
+ input1->close();
+ std::shared_ptr<PartialOsmMapReader> partialReader2 =
+ std::dynamic_pointer_cast<PartialOsmMapReader>(input2);
+ if (partialReader2)
+ {
+ partialReader2->finalizePartial();
+ }
+ input2->close();
+
+ if (_printStats)
+ {
+ LOG_INFO("Changeset Stats:\n" << stats);
+ }
+}
+
+}