Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
chore(xmlupload): add metrics flag (DEV-1512) (#264)
  • Loading branch information
jnussbaum committed Dec 2, 2022
1 parent efc9f51 commit f4822dc
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 64 deletions.
1 change: 1 addition & 0 deletions docs/dsp-tools-usage.md
Expand Up @@ -97,6 +97,7 @@ The following options are available:
- `-I` | `--incremental` (optional) : If set, IRIs instead of internal IDs are expected as reference to already existing resources on DSP
- `-V` | `--validate` (optional): If set, the XML file will only be validated, but not uploaded.
- `-v` | `--verbose` (optional): If set, more information about the process is printed to the console.
- `-m` | `--metrics` (optional): If set, write metrics into a "metrics" folder in the current working directory

The command is used to upload data defined in an XML file onto a DSP server. The defaults are intended for local
testing:
Expand Down
5 changes: 4 additions & 1 deletion knora/dsp_tools.py
Expand Up @@ -92,6 +92,8 @@ def program(user_args: list[str]) -> None:
parser_upload.add_argument('-S', '--sipi', type=str, default='http://0.0.0.0:1024', help='URL of SIPI server')
parser_upload.add_argument('-v', '--verbose', action='store_true', help=verbose_text)
parser_upload.add_argument('-I', '--incremental', action='store_true', help='Incremental XML upload')
parser_upload.add_argument('-m', '--metrics', action='store_true', help='Write metrics into a "metrics" folder in '
'the current working directory')
parser_upload.add_argument('xmlfile', help='path to xml file containing the data', default='data.xml')

# excel2json
Expand Down Expand Up @@ -213,7 +215,8 @@ def program(user_args: list[str]) -> None:
imgdir=args.imgdir,
sipi=args.sipi,
verbose=args.verbose,
incremental=args.incremental)
incremental=args.incremental,
save_metrics=args.metrics)
elif args.action == 'excel2json':
excel2json(data_model_files=args.data_model_files,
path_to_output_file=args.outfile)
Expand Down
137 changes: 78 additions & 59 deletions knora/dsplib/utils/xml_upload.py
Expand Up @@ -5,13 +5,14 @@
import json
import os
import re
import sys
import uuid
from collections import namedtuple
from datetime import datetime
from pathlib import Path
from typing import Optional, cast, Tuple, Any
from urllib.parse import quote_plus

import pandas as pd
from lxml import etree

from knora.dsplib.models.connection import Connection
Expand All @@ -26,6 +27,8 @@
from knora.dsplib.models.xmlresource import XMLResource
from knora.dsplib.utils.shared import try_network_action, validate_xml_against_schema

MetricRecord = namedtuple("MetricRecord", ["res_id", "filetype", "filesize_mb", "event", "duration_ms", "mb_per_sec"])


def _remove_circular_references(resources: list[XMLResource], verbose: bool) -> \
tuple[list[XMLResource],
Expand Down Expand Up @@ -278,7 +281,7 @@ def _check_consistency_with_ontology(


def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: str, sipi: str, verbose: bool,
incremental: bool) -> bool:
incremental: bool, save_metrics: bool) -> bool:
"""
This function reads an XML file and imports the data described in it onto the DSP server.
Expand All @@ -291,12 +294,16 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s
sipi: the sipi instance to be used
verbose: verbose option for the command, if used more output is given to the user
incremental: if set, IRIs instead of internal IDs are expected as resource pointers
save_metrics: if true, saves time measurements into a "metrics" folder in the current working directory
Returns:
True if all resources could be uploaded without errors; False if any resource (or part of it) could not be
successfully uploaded
"""

metrics: list[MetricRecord] = []
preparation_start = datetime.now()

# Validate the input XML file
try:
validate_xml_against_schema(input_file)
Expand All @@ -305,13 +312,16 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s
f"{err.message}")
quit(0)

if sys.platform.startswith("darwin") or sys.platform.startswith("linux"):
save_location = f"{os.path.expanduser('~')}/.dsp-tools"
elif sys.platform.startswith("win"):
save_location = "."
else:
save_location = "."
# TODO: use the home directory provided by Pathlib
save_location = Path.home() / Path(".dsp-tools")
server_as_foldername = server
server_substitutions = {
r"https?://": "",
r"^api\..+": "",
r":\d{4}/?$": "",
r"0.0.0.0": "localhost"
}
for pattern, repl in server_substitutions.items():
server_as_foldername = re.sub(pattern, repl, server_as_foldername)

# Connect to the DaSCH Service Platform API and get the project context
con = Connection(server)
Expand Down Expand Up @@ -353,12 +363,18 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s
stashed_xml_texts = dict()
stashed_resptr_props = dict()

preparation_duration = datetime.now() - preparation_start
preparation_duration_ms = preparation_duration.seconds * 1000 + int(preparation_duration.microseconds / 1000)
metrics.append(MetricRecord("", "", "", "xml upload preparation", preparation_duration_ms, ""))

# upload all resources
id2iri_mapping: dict[str, str] = {}
failed_uploads: list[str] = []
try:
id2iri_mapping, failed_uploads = _upload_resources(resources, imgdir, sipi_server, permissions_lookup,
resclass_name_2_type, id2iri_mapping, con, failed_uploads)
id2iri_mapping, failed_uploads, metrics = _upload_resources(
resources, imgdir, sipi_server, permissions_lookup, resclass_name_2_type, id2iri_mapping, con,
failed_uploads, metrics
)
except BaseException as err:
_handle_upload_error(
err=err,
Expand All @@ -368,7 +384,7 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s
stashed_resptr_props=stashed_resptr_props,
proj_shortcode=shortcode,
onto_name=default_ontology,
server=server,
server_as_foldername=server_as_foldername,
save_location=save_location
)

Expand All @@ -387,7 +403,7 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s
stashed_resptr_props=stashed_resptr_props,
proj_shortcode=shortcode,
onto_name=default_ontology,
server=server,
server_as_foldername=server_as_foldername,
save_location=save_location
)

Expand All @@ -406,7 +422,7 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s
stashed_resptr_props=stashed_resptr_props,
proj_shortcode=shortcode,
onto_name=default_ontology,
server=server,
server_as_foldername=server_as_foldername,
save_location=save_location
)

Expand All @@ -420,6 +436,10 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s
if failed_uploads:
print(f"\nWARNING: Could not upload the following resources: {failed_uploads}\n")
success = False
if save_metrics:
os.makedirs("metrics", exist_ok=True)
df = pd.DataFrame(metrics)
df.to_csv(f"metrics/{timestamp_str}_metrics_{server_as_foldername}_{Path(input_file).stem}.csv")
if success:
print("All resources have successfully been uploaded.")

Expand All @@ -434,8 +454,9 @@ def _upload_resources(
resclass_name_2_type: dict[str, type],
id2iri_mapping: dict[str, str],
con: Connection,
failed_uploads: list[str]
) -> tuple[dict[str, str], list[str]]:
failed_uploads: list[str],
metrics: list[MetricRecord]
) -> tuple[dict[str, str], list[str], list[MetricRecord]]:
"""
Iterates through all resources and tries to upload them to DSP
Expand All @@ -448,20 +469,24 @@ def _upload_resources(
id2iri_mapping: mapping of ids from the XML file to IRIs in DSP (initially empty, gets filled during the upload)
con: connection to DSP
failed_uploads: ids of resources that could not be uploaded (initially empty, gets filled during the upload)
metrics: list with the metric records collected until now (gets filled during the upload)
Returns:
id2iri_mapping, failed_uploads: These two arguments are modified during the upload
id2iri_mapping, failed_uploads, metrics
"""

# If there are multimedia files: calculate their total size
bitstream_all_sizes_mb = [os.path.getsize(os.path.join(imgdir, res.bitstream.value)) / 1000000 for res in resources if res.bitstream]
if len(bitstream_all_sizes_mb) > 0:
bitstream_all_sizes_mb = [Path(Path(imgdir) / Path(res.bitstream.value)).stat().st_size / 1000000 if res.bitstream else 0.0 for res in resources]
if sum(bitstream_all_sizes_mb) > 0:
bitstream_size_total_mb = round(sum(bitstream_all_sizes_mb), 1)
bitstream_all_sizes_iterator = iter(bitstream_all_sizes_mb) # for later reuse, to avoid later system calls
bitstream_size_uploaded_mb = 0.0
print(f"This xmlupload contains multimedia files with a total size of {bitstream_size_total_mb} MB.")

for i, resource in enumerate(resources):
resource_start = datetime.now()
filetype = ""
filesize = round(bitstream_all_sizes_mb[i], 1)
bitstream_duration_ms = None
resource_iri = resource.iri
if resource.ark:
resource_iri = _convert_ark_v0_to_resource_iri(resource.ark)
Expand All @@ -470,46 +495,47 @@ def _upload_resources(
resource_bitstream = None
if resource.bitstream:
try:
bitstream_start = datetime.now()
filetype = Path(resource.bitstream.value).suffix[1:]
img: Optional[dict[Any, Any]] = try_network_action(
action=lambda: sipi_server.upload_bitstream(filepath=os.path.join(imgdir, resource.bitstream.value)), # type: ignore
action=lambda: sipi_server.upload_bitstream(filepath=str(Path(imgdir) / Path(resource.bitstream.value))), # type: ignore
failure_msg=f'ERROR while trying to upload file "{resource.bitstream.value}" of resource '
f'"{resource.label}" ({resource.id}).'
)
bitstream_duration = datetime.now() - bitstream_start
bitstream_duration_ms = bitstream_duration.seconds * 1000 + int(bitstream_duration.microseconds / 1000)
mb_per_sec = round((filesize / bitstream_duration_ms) * 1000, 1)
metrics.append(MetricRecord(resource.id, filetype, filesize, "bitstream upload", bitstream_duration_ms, mb_per_sec))
except BaseError as err:
print(err.message)
failed_uploads.append(resource.id)
continue
bitstream_size_uploaded_mb += next(bitstream_all_sizes_iterator)
bitstream_size_uploaded_mb += bitstream_all_sizes_mb[i]
print(f"Uploaded file '{resource.bitstream.value}' ({bitstream_size_uploaded_mb:.1f} MB / {bitstream_size_total_mb} MB)")
internal_file_name_bitstream = img['uploadedFiles'][0]['internalFilename'] # type: ignore
resource_bitstream = resource.get_bitstream(internal_file_name_bitstream, permissions_lookup)

# create the resource in DSP
resclass_type = resclass_name_2_type[resource.restype]
properties = resource.get_propvals(id2iri_mapping, permissions_lookup)
resource_instance: ResourceInstance = resclass_type(
con=con,
label=resource.label,
iri=resource_iri,
permissions=permissions_lookup.get(resource.permissions), # type: ignore
creation_date=resource.creation_date,
bitstream=resource_bitstream,
values=properties
)
try:
resource_instance: ResourceInstance = try_network_action(
action=lambda: resclass_type(
con=con,
label=resource.label,
iri=resource_iri,
permissions=permissions_lookup.get(resource.permissions), # type: ignore
creation_date=resource.creation_date,
bitstream=resource_bitstream,
values=properties
),
failure_msg=f"ERROR while trying to create resource '{resource.label}' ({resource.id})."
)
except BaseError as err:
print(err.message)
failed_uploads.append(resource.id)
continue

try:
resource_creation_start = datetime.now()
created_resource: ResourceInstance = try_network_action(
action=lambda: resource_instance.create(),
failure_msg=f"ERROR while trying to create resource '{resource.label}' ({resource.id})."
)
resource_creation_duration = datetime.now() - resource_creation_start
resource_creation_duration_ms = resource_creation_duration.seconds * 1000 + int(resource_creation_duration.microseconds / 1000)
metrics.append(MetricRecord(resource.id, filetype, filesize, "resource creation", resource_creation_duration_ms, ""))
except BaseError as err:
print(err.message)
failed_uploads.append(resource.id)
Expand All @@ -518,7 +544,12 @@ def _upload_resources(
print(f"Created resource {i+1}/{len(resources)}: '{created_resource.label}' (ID: '{resource.id}', IRI: "
f"'{created_resource.iri}')")

return id2iri_mapping, failed_uploads
resource_duration = datetime.now() - resource_start
resource_duration_ms = resource_duration.seconds * 1000 + int(resource_duration.microseconds / 1000)
looping_overhead_ms = resource_duration_ms - resource_creation_duration_ms - (bitstream_duration_ms or 0)
metrics.append(MetricRecord(resource.id, filetype, filesize, "looping overhead", looping_overhead_ms, ""))

return id2iri_mapping, failed_uploads, metrics


def _upload_stashed_xml_texts(
Expand Down Expand Up @@ -747,8 +778,8 @@ def _handle_upload_error(
stashed_resptr_props: dict[XMLResource, dict[XMLProperty, list[str]]],
proj_shortcode: str,
onto_name: str,
server: str,
save_location: str
server_as_foldername: str,
save_location: Path
) -> None:
"""
In case the xmlupload must be interrupted, e.g. because of an error that could not be handled, or due to keyboard
Expand All @@ -764,8 +795,8 @@ def _handle_upload_error(
stashed_resptr_props: all resptr props that have been stashed
proj_shortcode: shortcode of the project the data belongs to
onto_name: name of the ontology the data references
server: the server which the data is uploaded onto
save_location: path to the directory where dsp-tools should save logs (OS dependent)
server_as_foldername: the server which the data is uploaded onto (in a form that can be used as folder name)
save_location: path where to save the logs
Returns:
None
Expand All @@ -775,20 +806,8 @@ def _handle_upload_error(
f'\nxmlupload must be aborted because of an error')
timestamp_str = datetime.now().strftime("%Y-%m-%d_%H%M%S")

server_substitutions = {
r"https?://": "",
r"^api\..+": "",
r":\d{4}/?$": "",
r"0.0.0.0": "localhost"
}
for pattern, repl in server_substitutions.items():
server = re.sub(pattern, repl, server)

if save_location == ".":
save_location_full = f"xmluploads/{server}/{proj_shortcode}/{onto_name}"
else:
save_location_full = f"{save_location}/xmluploads/{server}/{proj_shortcode}/{onto_name}"
os.makedirs(save_location_full, exist_ok=True)
save_location_full = save_location / "xmluploads" / server_as_foldername / proj_shortcode / onto_name
save_location_full.mkdir(parents=True, exist_ok=True)

# only stashed properties of resources that already exist in DSP are of interest
stashed_xml_texts = _purge_stashed_xml_texts(stashed_xml_texts, id2iri_mapping)
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/test_0123_import_scripts.py
Expand Up @@ -62,7 +62,8 @@ def test_import_scripts(self) -> None:
imgdir="knora/dsplib/import_scripts/",
sipi="http://0.0.0.0:1024",
verbose=False,
incremental=False
incremental=False,
save_metrics=False
)
self.assertTrue(success_on_xmlupload)

Expand Down
9 changes: 6 additions & 3 deletions test/e2e/test_tools.py
Expand Up @@ -251,7 +251,8 @@ def test_xml_upload(self) -> None:
imgdir=self.imgdir,
sipi=self.sipi,
verbose=False,
incremental=False)
incremental=False,
save_metrics=False)
self.assertTrue(result_minimal)

result_systematic = xml_upload(
Expand All @@ -262,7 +263,8 @@ def test_xml_upload(self) -> None:
imgdir=self.imgdir,
sipi=self.sipi,
verbose=False,
incremental=False)
incremental=False,
save_metrics=False)
self.assertTrue(result_systematic)

mapping_file = ""
Expand All @@ -287,7 +289,8 @@ def test_xml_upload(self) -> None:
imgdir=self.imgdir,
sipi=self.sipi,
verbose=True,
incremental=True
incremental=True,
save_metrics=False
)
self.assertTrue(result_replaced)
self.assertTrue(all([not f.name.startswith("stashed_text_properties_") for f in os.scandir(".")]))
Expand Down

0 comments on commit f4822dc

Please sign in to comment.