Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(xmlupload): add metrics flag (DEV-1512) #264

Merged
merged 7 commits into from Dec 2, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion knora/dsp_tools.py
Expand Up @@ -92,6 +92,7 @@ def program(user_args: list[str]) -> None:
parser_upload.add_argument('-S', '--sipi', type=str, default='http://0.0.0.0:1024', help='URL of SIPI server')
parser_upload.add_argument('-v', '--verbose', action='store_true', help=verbose_text)
parser_upload.add_argument('-I', '--incremental', action='store_true', help='Incremental XML upload')
parser_upload.add_argument('-m', '--metrics', action='store_true', help='Write metrics into ~/.dsp-tools')
parser_upload.add_argument('xmlfile', help='path to xml file containing the data', default='data.xml')

# excel2json
Expand Down Expand Up @@ -213,7 +214,8 @@ def program(user_args: list[str]) -> None:
imgdir=args.imgdir,
sipi=args.sipi,
verbose=args.verbose,
incremental=args.incremental)
incremental=args.incremental,
save_metrics=args.metrics)
elif args.action == 'excel2json':
excel2json(data_model_files=args.data_model_files,
path_to_output_file=args.outfile)
Expand Down
135 changes: 79 additions & 56 deletions knora/dsplib/utils/xml_upload.py
Expand Up @@ -7,11 +7,13 @@
import re
import sys
import uuid
from collections import namedtuple
from datetime import datetime
from pathlib import Path
from typing import Optional, cast, Tuple, Any
from urllib.parse import quote_plus

import pandas as pd
from lxml import etree

from knora.dsplib.models.connection import Connection
Expand All @@ -26,6 +28,8 @@
from knora.dsplib.models.xmlresource import XMLResource
from knora.dsplib.utils.shared import try_network_action, validate_xml_against_schema

MetricRecord = namedtuple("MetricRecord", ["res_id", "filetype", "filesize_mb", "event", "duration_microsec", "mb_per_sec"])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this pattern of using a data structure ("product type" in functional terminology) as a model for data in code. Personally I tend to implement this using dataclasses rather than namedtuples... but that's just a matter of taste.



def _remove_circular_references(resources: list[XMLResource], verbose: bool) -> \
tuple[list[XMLResource],
Expand Down Expand Up @@ -278,7 +282,7 @@ def _check_consistency_with_ontology(


def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: str, sipi: str, verbose: bool,
incremental: bool) -> bool:
incremental: bool, save_metrics: bool) -> bool:
"""
This function reads an XML file and imports the data described in it onto the DSP server.

Expand All @@ -291,12 +295,16 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s
sipi: the sipi instance to be used
verbose: verbose option for the command, if used more output is given to the user
incremental: if set, IRIs instead of internal IDs are expected as resource pointers
save_metrics: if true, saves time measurements into ~/.dsp-tools

Returns:
True if all resources could be uploaded without errors; False if any resource (or part of it) could not be
successfully uploaded
"""

metrics: list[MetricRecord] = []
preparation_start = datetime.now()

# Validate the input XML file
try:
validate_xml_against_schema(input_file)
Expand All @@ -306,12 +314,18 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s
quit(0)

if sys.platform.startswith("darwin") or sys.platform.startswith("linux"):
save_location = f"{os.path.expanduser('~')}/.dsp-tools"
elif sys.platform.startswith("win"):
save_location = "."
save_location = Path.home() / Path(".dsp-tools")
jnussbaum marked this conversation as resolved.
Show resolved Hide resolved
else:
save_location = "."
# TODO: use the home directory provided by Pathlib
save_location = Path.cwd()
server_as_foldername = server
server_substitutions = {
r"https?://": "",
r"^api\..+": "",
r":\d{4}/?$": "",
r"0.0.0.0": "localhost"
}
for pattern, repl in server_substitutions.items():
server_as_foldername = re.sub(pattern, repl, server_as_foldername)

# Connect to the DaSCH Service Platform API and get the project context
con = Connection(server)
Expand Down Expand Up @@ -353,12 +367,18 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s
stashed_xml_texts = dict()
stashed_resptr_props = dict()

preparation_time = datetime.now() - preparation_start
preparation_time_microsec = preparation_time.seconds * 1_000_000 + preparation_time.microseconds
jnussbaum marked this conversation as resolved.
Show resolved Hide resolved
jnussbaum marked this conversation as resolved.
Show resolved Hide resolved
metrics.append(MetricRecord("", "", "", "xml upload preparation", preparation_time_microsec, ""))

# upload all resources
id2iri_mapping: dict[str, str] = {}
failed_uploads: list[str] = []
try:
id2iri_mapping, failed_uploads = _upload_resources(resources, imgdir, sipi_server, permissions_lookup,
resclass_name_2_type, id2iri_mapping, con, failed_uploads)
id2iri_mapping, failed_uploads, metrics = _upload_resources(
resources, imgdir, sipi_server, permissions_lookup, resclass_name_2_type, id2iri_mapping, con,
failed_uploads, metrics
)
except BaseException as err:
_handle_upload_error(
err=err,
Expand All @@ -368,7 +388,7 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s
stashed_resptr_props=stashed_resptr_props,
proj_shortcode=shortcode,
onto_name=default_ontology,
server=server,
server_as_foldername=server_as_foldername,
save_location=save_location
)

Expand All @@ -387,7 +407,7 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s
stashed_resptr_props=stashed_resptr_props,
proj_shortcode=shortcode,
onto_name=default_ontology,
server=server,
server_as_foldername=server_as_foldername,
save_location=save_location
)

Expand All @@ -406,7 +426,7 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s
stashed_resptr_props=stashed_resptr_props,
proj_shortcode=shortcode,
onto_name=default_ontology,
server=server,
server_as_foldername=server_as_foldername,
save_location=save_location
)

Expand All @@ -420,6 +440,10 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s
if failed_uploads:
print(f"\nWARNING: Could not upload the following resources: {failed_uploads}\n")
success = False
if save_metrics:
os.makedirs("metrics", exist_ok=True)
df = pd.DataFrame(metrics)
df.to_csv(f"metrics/{timestamp_str}_metrics_{server_as_foldername}_{Path(input_file).stem}.csv")
if success:
print("All resources have successfully been uploaded.")

Expand All @@ -434,8 +458,9 @@ def _upload_resources(
resclass_name_2_type: dict[str, type],
id2iri_mapping: dict[str, str],
con: Connection,
failed_uploads: list[str]
) -> tuple[dict[str, str], list[str]]:
failed_uploads: list[str],
metrics: list[MetricRecord]
) -> tuple[dict[str, str], list[str], list[MetricRecord]]:
"""
Iterates through all resources and tries to upload them to DSP

Expand All @@ -448,20 +473,24 @@ def _upload_resources(
id2iri_mapping: mapping of ids from the XML file to IRIs in DSP (initially empty, gets filled during the upload)
con: connection to DSP
failed_uploads: ids of resources that could not be uploaded (initially empty, gets filled during the upload)
metrics: list with the metric records collected until now (gets filled during the upload)

Returns:
id2iri_mapping, failed_uploads: These two arguments are modified during the upload
id2iri_mapping, failed_uploads, metrics
"""

# If there are multimedia files: calculate their total size
bitstream_all_sizes_mb = [os.path.getsize(os.path.join(imgdir, res.bitstream.value)) / 1000000 for res in resources if res.bitstream]
if len(bitstream_all_sizes_mb) > 0:
bitstream_all_sizes_mb = [Path(Path(imgdir) / Path(res.bitstream.value)).stat().st_size / 1000000 if res.bitstream else 0.0 for res in resources]
if sum(bitstream_all_sizes_mb) > 0:
bitstream_size_total_mb = round(sum(bitstream_all_sizes_mb), 1)
bitstream_all_sizes_iterator = iter(bitstream_all_sizes_mb) # for later reuse, to avoid later system calls
bitstream_size_uploaded_mb = 0.0
print(f"This xmlupload contains multimedia files with a total size of {bitstream_size_total_mb} MB.")

for i, resource in enumerate(resources):
resource_start = datetime.now()
filetype = ""
filesize = round(bitstream_all_sizes_mb[i], 1)
bitstream_time_microsec = None
resource_iri = resource.iri
if resource.ark:
resource_iri = _convert_ark_v0_to_resource_iri(resource.ark)
Expand All @@ -470,46 +499,47 @@ def _upload_resources(
resource_bitstream = None
if resource.bitstream:
try:
bitstream_start = datetime.now()
filetype = Path(resource.bitstream.value).suffix[1:]
img: Optional[dict[Any, Any]] = try_network_action(
action=lambda: sipi_server.upload_bitstream(filepath=os.path.join(imgdir, resource.bitstream.value)), # type: ignore
action=lambda: sipi_server.upload_bitstream(filepath=str(Path(imgdir) / Path(resource.bitstream.value))), # type: ignore
failure_msg=f'ERROR while trying to upload file "{resource.bitstream.value}" of resource '
f'"{resource.label}" ({resource.id}).'
)
bitstream_time = datetime.now() - bitstream_start
bitstream_time_microsec = bitstream_time.seconds * 1_000_000 + bitstream_time.microseconds
mb_per_sec = round((filesize / bitstream_time_microsec) * 1_000_000, 1)
metrics.append(MetricRecord(resource.id, filetype, filesize, "bitstream upload", bitstream_time_microsec, mb_per_sec))
except BaseError as err:
print(err.message)
failed_uploads.append(resource.id)
continue
bitstream_size_uploaded_mb += next(bitstream_all_sizes_iterator)
bitstream_size_uploaded_mb += bitstream_all_sizes_mb[i]
print(f"Uploaded file '{resource.bitstream.value}' ({bitstream_size_uploaded_mb:.1f} MB / {bitstream_size_total_mb} MB)")
internal_file_name_bitstream = img['uploadedFiles'][0]['internalFilename'] # type: ignore
resource_bitstream = resource.get_bitstream(internal_file_name_bitstream, permissions_lookup)

# create the resource in DSP
resclass_type = resclass_name_2_type[resource.restype]
properties = resource.get_propvals(id2iri_mapping, permissions_lookup)
resource_instance: ResourceInstance = resclass_type(
con=con,
label=resource.label,
iri=resource_iri,
permissions=permissions_lookup.get(resource.permissions), # type: ignore
creation_date=resource.creation_date,
bitstream=resource_bitstream,
values=properties
)
try:
resource_instance: ResourceInstance = try_network_action(
action=lambda: resclass_type(
con=con,
label=resource.label,
iri=resource_iri,
permissions=permissions_lookup.get(resource.permissions), # type: ignore
creation_date=resource.creation_date,
bitstream=resource_bitstream,
values=properties
),
failure_msg=f"ERROR while trying to create resource '{resource.label}' ({resource.id})."
)
except BaseError as err:
print(err.message)
failed_uploads.append(resource.id)
continue

try:
resource_creation_start = datetime.now()
created_resource: ResourceInstance = try_network_action(
action=lambda: resource_instance.create(),
failure_msg=f"ERROR while trying to create resource '{resource.label}' ({resource.id})."
)
resource_creation_time = datetime.now() - resource_creation_start
resource_creation_time_microsec = resource_creation_time.seconds * 1_000_000 + resource_creation_time.microseconds
metrics.append(MetricRecord(resource.id, filetype, filesize, "resource creation", resource_creation_time_microsec, ""))
except BaseError as err:
print(err.message)
failed_uploads.append(resource.id)
Expand All @@ -518,7 +548,12 @@ def _upload_resources(
print(f"Created resource {i+1}/{len(resources)}: '{created_resource.label}' (ID: '{resource.id}', IRI: "
f"'{created_resource.iri}')")

return id2iri_mapping, failed_uploads
resource_time = datetime.now() - resource_start
resource_time_microsec = resource_time.seconds * 1_000_000 + resource_time.microseconds
looping_overhead_ms = resource_time_microsec - resource_creation_time_microsec - (bitstream_time_microsec or 0)
metrics.append(MetricRecord(resource.id, filetype, filesize, "looping overhead", looping_overhead_ms, ""))

return id2iri_mapping, failed_uploads, metrics


def _upload_stashed_xml_texts(
Expand Down Expand Up @@ -747,8 +782,8 @@ def _handle_upload_error(
stashed_resptr_props: dict[XMLResource, dict[XMLProperty, list[str]]],
proj_shortcode: str,
onto_name: str,
server: str,
save_location: str
server_as_foldername: str,
save_location: Path
) -> None:
"""
In case the xmlupload must be interrupted, e.g. because of an error that could not be handled, or due to keyboard
Expand All @@ -764,8 +799,8 @@ def _handle_upload_error(
stashed_resptr_props: all resptr props that have been stashed
proj_shortcode: shortcode of the project the data belongs to
onto_name: name of the ontology the data references
server: the server which the data is uploaded onto
save_location: path to the directory where dsp-tools should save logs (OS dependent)
server_as_foldername: the server which the data is uploaded onto (in a form that can be used as folder name)
save_location: path to the directory where dsp-tools should save logs

Returns:
None
Expand All @@ -775,20 +810,8 @@ def _handle_upload_error(
f'\nxmlupload must be aborted because of an error')
timestamp_str = datetime.now().strftime("%Y-%m-%d_%H%M%S")

server_substitutions = {
r"https?://": "",
r"^api\..+": "",
r":\d{4}/?$": "",
r"0.0.0.0": "localhost"
}
for pattern, repl in server_substitutions.items():
server = re.sub(pattern, repl, server)

if save_location == ".":
save_location_full = f"xmluploads/{server}/{proj_shortcode}/{onto_name}"
else:
save_location_full = f"{save_location}/xmluploads/{server}/{proj_shortcode}/{onto_name}"
os.makedirs(save_location_full, exist_ok=True)
save_location_full = save_location / "xmluploads" / server_as_foldername / proj_shortcode / onto_name
save_location_full.mkdir(parents=True, exist_ok=True)

# only stashed properties of resources that already exist in DSP are of interest
stashed_xml_texts = _purge_stashed_xml_texts(stashed_xml_texts, id2iri_mapping)
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/test_0123_import_scripts.py
Expand Up @@ -62,7 +62,8 @@ def test_import_scripts(self) -> None:
imgdir="knora/dsplib/import_scripts/",
sipi="http://0.0.0.0:1024",
verbose=False,
incremental=False
incremental=False,
save_metrics=False
)
self.assertTrue(success_on_xmlupload)

Expand Down
9 changes: 6 additions & 3 deletions test/e2e/test_tools.py
Expand Up @@ -251,7 +251,8 @@ def test_xml_upload(self) -> None:
imgdir=self.imgdir,
sipi=self.sipi,
verbose=False,
incremental=False)
incremental=False,
save_metrics=False)
self.assertTrue(result_minimal)

result_systematic = xml_upload(
Expand All @@ -262,7 +263,8 @@ def test_xml_upload(self) -> None:
imgdir=self.imgdir,
sipi=self.sipi,
verbose=False,
incremental=False)
incremental=False,
save_metrics=False)
self.assertTrue(result_systematic)

mapping_file = ""
Expand All @@ -287,7 +289,8 @@ def test_xml_upload(self) -> None:
imgdir=self.imgdir,
sipi=self.sipi,
verbose=True,
incremental=True
incremental=True,
save_metrics=False
)
self.assertTrue(result_replaced)
self.assertTrue(all([not f.name.startswith("stashed_text_properties_") for f in os.scandir(".")]))
Expand Down