diff --git a/docs/dsp-tools-usage.md b/docs/dsp-tools-usage.md index ebc96db95..2bcae4b7b 100644 --- a/docs/dsp-tools-usage.md +++ b/docs/dsp-tools-usage.md @@ -97,6 +97,7 @@ The following options are available: - `-I` | `--incremental` (optional) : If set, IRIs instead of internal IDs are expected as reference to already existing resources on DSP - `-V` | `--validate` (optional): If set, the XML file will only be validated, but not uploaded. - `-v` | `--verbose` (optional): If set, more information about the process is printed to the console. +- `-m` | `--metrics` (optional): If set, write metrics into a "metrics" folder in the current working directory The command is used to upload data defined in an XML file onto a DSP server. The defaults are intended for local testing: diff --git a/knora/dsp_tools.py b/knora/dsp_tools.py index 4a52c9677..13a807de4 100644 --- a/knora/dsp_tools.py +++ b/knora/dsp_tools.py @@ -92,6 +92,8 @@ def program(user_args: list[str]) -> None: parser_upload.add_argument('-S', '--sipi', type=str, default='http://0.0.0.0:1024', help='URL of SIPI server') parser_upload.add_argument('-v', '--verbose', action='store_true', help=verbose_text) parser_upload.add_argument('-I', '--incremental', action='store_true', help='Incremental XML upload') + parser_upload.add_argument('-m', '--metrics', action='store_true', help='Write metrics into a "metrics" folder in ' + 'the current working directory') parser_upload.add_argument('xmlfile', help='path to xml file containing the data', default='data.xml') # excel2json @@ -213,7 +215,8 @@ def program(user_args: list[str]) -> None: imgdir=args.imgdir, sipi=args.sipi, verbose=args.verbose, - incremental=args.incremental) + incremental=args.incremental, + save_metrics=args.metrics) elif args.action == 'excel2json': excel2json(data_model_files=args.data_model_files, path_to_output_file=args.outfile) diff --git a/knora/dsplib/utils/xml_upload.py b/knora/dsplib/utils/xml_upload.py index f0654f1d9..2ec781db4 100644 --- a/knora/dsplib/utils/xml_upload.py +++ b/knora/dsplib/utils/xml_upload.py @@ -5,13 +5,14 @@ import json import os import re -import sys import uuid +from collections import namedtuple from datetime import datetime from pathlib import Path from typing import Optional, cast, Tuple, Any from urllib.parse import quote_plus +import pandas as pd from lxml import etree from knora.dsplib.models.connection import Connection @@ -26,6 +27,8 @@ from knora.dsplib.models.xmlresource import XMLResource from knora.dsplib.utils.shared import try_network_action, validate_xml_against_schema +MetricRecord = namedtuple("MetricRecord", ["res_id", "filetype", "filesize_mb", "event", "duration_ms", "mb_per_sec"]) + def _remove_circular_references(resources: list[XMLResource], verbose: bool) -> \ tuple[list[XMLResource], @@ -278,7 +281,7 @@ def _check_consistency_with_ontology( def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: str, sipi: str, verbose: bool, - incremental: bool) -> bool: + incremental: bool, save_metrics: bool) -> bool: """ This function reads an XML file and imports the data described in it onto the DSP server. @@ -291,12 +294,16 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s sipi: the sipi instance to be used verbose: verbose option for the command, if used more output is given to the user incremental: if set, IRIs instead of internal IDs are expected as resource pointers + save_metrics: if true, saves time measurements into a "metrics" folder in the current working directory Returns: True if all resources could be uploaded without errors; False if any resource (or part of it) could not be successfully uploaded """ + metrics: list[MetricRecord] = [] + preparation_start = datetime.now() + # Validate the input XML file try: validate_xml_against_schema(input_file) @@ -305,13 +312,16 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s f"{err.message}") quit(0) - if sys.platform.startswith("darwin") or sys.platform.startswith("linux"): - save_location = f"{os.path.expanduser('~')}/.dsp-tools" - elif sys.platform.startswith("win"): - save_location = "." - else: - save_location = "." - # TODO: use the home directory provided by Pathlib + save_location = Path.home() / Path(".dsp-tools") + server_as_foldername = server + server_substitutions = { + r"https?://": "", + r"^api\..+": "", + r":\d{4}/?$": "", + r"0.0.0.0": "localhost" + } + for pattern, repl in server_substitutions.items(): + server_as_foldername = re.sub(pattern, repl, server_as_foldername) # Connect to the DaSCH Service Platform API and get the project context con = Connection(server) @@ -353,12 +363,18 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s stashed_xml_texts = dict() stashed_resptr_props = dict() + preparation_duration = datetime.now() - preparation_start + preparation_duration_ms = preparation_duration.seconds * 1000 + int(preparation_duration.microseconds / 1000) + metrics.append(MetricRecord("", "", "", "xml upload preparation", preparation_duration_ms, "")) + # upload all resources id2iri_mapping: dict[str, str] = {} failed_uploads: list[str] = [] try: - id2iri_mapping, failed_uploads = _upload_resources(resources, imgdir, sipi_server, permissions_lookup, - resclass_name_2_type, id2iri_mapping, con, failed_uploads) + id2iri_mapping, failed_uploads, metrics = _upload_resources( + resources, imgdir, sipi_server, permissions_lookup, resclass_name_2_type, id2iri_mapping, con, + failed_uploads, metrics + ) except BaseException as err: _handle_upload_error( err=err, @@ -368,7 +384,7 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s stashed_resptr_props=stashed_resptr_props, proj_shortcode=shortcode, onto_name=default_ontology, - server=server, + server_as_foldername=server_as_foldername, save_location=save_location ) @@ -387,7 +403,7 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s stashed_resptr_props=stashed_resptr_props, proj_shortcode=shortcode, onto_name=default_ontology, - server=server, + server_as_foldername=server_as_foldername, save_location=save_location ) @@ -406,7 +422,7 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s stashed_resptr_props=stashed_resptr_props, proj_shortcode=shortcode, onto_name=default_ontology, - server=server, + server_as_foldername=server_as_foldername, save_location=save_location ) @@ -420,6 +436,10 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s if failed_uploads: print(f"\nWARNING: Could not upload the following resources: {failed_uploads}\n") success = False + if save_metrics: + os.makedirs("metrics", exist_ok=True) + df = pd.DataFrame(metrics) + df.to_csv(f"metrics/{timestamp_str}_metrics_{server_as_foldername}_{Path(input_file).stem}.csv") if success: print("All resources have successfully been uploaded.") @@ -434,8 +454,9 @@ def _upload_resources( resclass_name_2_type: dict[str, type], id2iri_mapping: dict[str, str], con: Connection, - failed_uploads: list[str] -) -> tuple[dict[str, str], list[str]]: + failed_uploads: list[str], + metrics: list[MetricRecord] +) -> tuple[dict[str, str], list[str], list[MetricRecord]]: """ Iterates through all resources and tries to upload them to DSP @@ -448,20 +469,24 @@ def _upload_resources( id2iri_mapping: mapping of ids from the XML file to IRIs in DSP (initially empty, gets filled during the upload) con: connection to DSP failed_uploads: ids of resources that could not be uploaded (initially empty, gets filled during the upload) + metrics: list with the metric records collected until now (gets filled during the upload) Returns: - id2iri_mapping, failed_uploads: These two arguments are modified during the upload + id2iri_mapping, failed_uploads, metrics """ # If there are multimedia files: calculate their total size - bitstream_all_sizes_mb = [os.path.getsize(os.path.join(imgdir, res.bitstream.value)) / 1000000 for res in resources if res.bitstream] - if len(bitstream_all_sizes_mb) > 0: + bitstream_all_sizes_mb = [Path(Path(imgdir) / Path(res.bitstream.value)).stat().st_size / 1000000 if res.bitstream else 0.0 for res in resources] + if sum(bitstream_all_sizes_mb) > 0: bitstream_size_total_mb = round(sum(bitstream_all_sizes_mb), 1) - bitstream_all_sizes_iterator = iter(bitstream_all_sizes_mb) # for later reuse, to avoid later system calls bitstream_size_uploaded_mb = 0.0 print(f"This xmlupload contains multimedia files with a total size of {bitstream_size_total_mb} MB.") for i, resource in enumerate(resources): + resource_start = datetime.now() + filetype = "" + filesize = round(bitstream_all_sizes_mb[i], 1) + bitstream_duration_ms = None resource_iri = resource.iri if resource.ark: resource_iri = _convert_ark_v0_to_resource_iri(resource.ark) @@ -470,16 +495,22 @@ def _upload_resources( resource_bitstream = None if resource.bitstream: try: + bitstream_start = datetime.now() + filetype = Path(resource.bitstream.value).suffix[1:] img: Optional[dict[Any, Any]] = try_network_action( - action=lambda: sipi_server.upload_bitstream(filepath=os.path.join(imgdir, resource.bitstream.value)), # type: ignore + action=lambda: sipi_server.upload_bitstream(filepath=str(Path(imgdir) / Path(resource.bitstream.value))), # type: ignore failure_msg=f'ERROR while trying to upload file "{resource.bitstream.value}" of resource ' f'"{resource.label}" ({resource.id}).' ) + bitstream_duration = datetime.now() - bitstream_start + bitstream_duration_ms = bitstream_duration.seconds * 1000 + int(bitstream_duration.microseconds / 1000) + mb_per_sec = round((filesize / bitstream_duration_ms) * 1000, 1) + metrics.append(MetricRecord(resource.id, filetype, filesize, "bitstream upload", bitstream_duration_ms, mb_per_sec)) except BaseError as err: print(err.message) failed_uploads.append(resource.id) continue - bitstream_size_uploaded_mb += next(bitstream_all_sizes_iterator) + bitstream_size_uploaded_mb += bitstream_all_sizes_mb[i] print(f"Uploaded file '{resource.bitstream.value}' ({bitstream_size_uploaded_mb:.1f} MB / {bitstream_size_total_mb} MB)") internal_file_name_bitstream = img['uploadedFiles'][0]['internalFilename'] # type: ignore resource_bitstream = resource.get_bitstream(internal_file_name_bitstream, permissions_lookup) @@ -487,29 +518,24 @@ def _upload_resources( # create the resource in DSP resclass_type = resclass_name_2_type[resource.restype] properties = resource.get_propvals(id2iri_mapping, permissions_lookup) + resource_instance: ResourceInstance = resclass_type( + con=con, + label=resource.label, + iri=resource_iri, + permissions=permissions_lookup.get(resource.permissions), # type: ignore + creation_date=resource.creation_date, + bitstream=resource_bitstream, + values=properties + ) try: - resource_instance: ResourceInstance = try_network_action( - action=lambda: resclass_type( - con=con, - label=resource.label, - iri=resource_iri, - permissions=permissions_lookup.get(resource.permissions), # type: ignore - creation_date=resource.creation_date, - bitstream=resource_bitstream, - values=properties - ), - failure_msg=f"ERROR while trying to create resource '{resource.label}' ({resource.id})." - ) - except BaseError as err: - print(err.message) - failed_uploads.append(resource.id) - continue - - try: + resource_creation_start = datetime.now() created_resource: ResourceInstance = try_network_action( action=lambda: resource_instance.create(), failure_msg=f"ERROR while trying to create resource '{resource.label}' ({resource.id})." ) + resource_creation_duration = datetime.now() - resource_creation_start + resource_creation_duration_ms = resource_creation_duration.seconds * 1000 + int(resource_creation_duration.microseconds / 1000) + metrics.append(MetricRecord(resource.id, filetype, filesize, "resource creation", resource_creation_duration_ms, "")) except BaseError as err: print(err.message) failed_uploads.append(resource.id) @@ -518,7 +544,12 @@ def _upload_resources( print(f"Created resource {i+1}/{len(resources)}: '{created_resource.label}' (ID: '{resource.id}', IRI: " f"'{created_resource.iri}')") - return id2iri_mapping, failed_uploads + resource_duration = datetime.now() - resource_start + resource_duration_ms = resource_duration.seconds * 1000 + int(resource_duration.microseconds / 1000) + looping_overhead_ms = resource_duration_ms - resource_creation_duration_ms - (bitstream_duration_ms or 0) + metrics.append(MetricRecord(resource.id, filetype, filesize, "looping overhead", looping_overhead_ms, "")) + + return id2iri_mapping, failed_uploads, metrics def _upload_stashed_xml_texts( @@ -747,8 +778,8 @@ def _handle_upload_error( stashed_resptr_props: dict[XMLResource, dict[XMLProperty, list[str]]], proj_shortcode: str, onto_name: str, - server: str, - save_location: str + server_as_foldername: str, + save_location: Path ) -> None: """ In case the xmlupload must be interrupted, e.g. because of an error that could not be handled, or due to keyboard @@ -764,8 +795,8 @@ def _handle_upload_error( stashed_resptr_props: all resptr props that have been stashed proj_shortcode: shortcode of the project the data belongs to onto_name: name of the ontology the data references - server: the server which the data is uploaded onto - save_location: path to the directory where dsp-tools should save logs (OS dependent) + server_as_foldername: the server which the data is uploaded onto (in a form that can be used as folder name) + save_location: path where to save the logs Returns: None @@ -775,20 +806,8 @@ def _handle_upload_error( f'\nxmlupload must be aborted because of an error') timestamp_str = datetime.now().strftime("%Y-%m-%d_%H%M%S") - server_substitutions = { - r"https?://": "", - r"^api\..+": "", - r":\d{4}/?$": "", - r"0.0.0.0": "localhost" - } - for pattern, repl in server_substitutions.items(): - server = re.sub(pattern, repl, server) - - if save_location == ".": - save_location_full = f"xmluploads/{server}/{proj_shortcode}/{onto_name}" - else: - save_location_full = f"{save_location}/xmluploads/{server}/{proj_shortcode}/{onto_name}" - os.makedirs(save_location_full, exist_ok=True) + save_location_full = save_location / "xmluploads" / server_as_foldername / proj_shortcode / onto_name + save_location_full.mkdir(parents=True, exist_ok=True) # only stashed properties of resources that already exist in DSP are of interest stashed_xml_texts = _purge_stashed_xml_texts(stashed_xml_texts, id2iri_mapping) diff --git a/test/e2e/test_0123_import_scripts.py b/test/e2e/test_0123_import_scripts.py index e97b49d43..cb0135a11 100644 --- a/test/e2e/test_0123_import_scripts.py +++ b/test/e2e/test_0123_import_scripts.py @@ -62,7 +62,8 @@ def test_import_scripts(self) -> None: imgdir="knora/dsplib/import_scripts/", sipi="http://0.0.0.0:1024", verbose=False, - incremental=False + incremental=False, + save_metrics=False ) self.assertTrue(success_on_xmlupload) diff --git a/test/e2e/test_tools.py b/test/e2e/test_tools.py index a0f58c735..af575a8d0 100644 --- a/test/e2e/test_tools.py +++ b/test/e2e/test_tools.py @@ -251,7 +251,8 @@ def test_xml_upload(self) -> None: imgdir=self.imgdir, sipi=self.sipi, verbose=False, - incremental=False) + incremental=False, + save_metrics=False) self.assertTrue(result_minimal) result_systematic = xml_upload( @@ -262,7 +263,8 @@ def test_xml_upload(self) -> None: imgdir=self.imgdir, sipi=self.sipi, verbose=False, - incremental=False) + incremental=False, + save_metrics=False) self.assertTrue(result_systematic) mapping_file = "" @@ -287,7 +289,8 @@ def test_xml_upload(self) -> None: imgdir=self.imgdir, sipi=self.sipi, verbose=True, - incremental=True + incremental=True, + save_metrics=False ) self.assertTrue(result_replaced) self.assertTrue(all([not f.name.startswith("stashed_text_properties_") for f in os.scandir(".")]))