From fcf838482822223569fd08428c53b5a7464232b3 Mon Sep 17 00:00:00 2001
From: Johannes Nussbaum <39048939+jnussbaum@users.noreply.github.com>
Date: Thu, 23 Jun 2022 17:36:44 +0200
Subject: [PATCH] chore(xmlupload): refactor xmlupload, add unittest (DEV-1043)
(#203)
---
knora/dsplib/models/projectContext.py | 50 ++
knora/dsplib/models/xmlallow.py | 57 ++
knora/dsplib/models/xmlbitstream.py | 26 +
knora/dsplib/models/xmlerror.py | 9 +
knora/dsplib/models/xmlpermission.py | 54 ++
knora/dsplib/models/xmlproperty.py | 65 ++
knora/dsplib/models/xmlresource.py | 218 +++++++
knora/dsplib/models/xmlvalue.py | 80 +++
knora/dsplib/utils/xml_upload.py | 615 ++----------------
.../test_convert_ark_v0_to_resource_iri.py | 34 -
test/unittests/test_xmlupload.py | 118 ++++
testdata/test-data.xml | 12 +
12 files changed, 735 insertions(+), 603 deletions(-)
create mode 100644 knora/dsplib/models/projectContext.py
create mode 100644 knora/dsplib/models/xmlallow.py
create mode 100644 knora/dsplib/models/xmlbitstream.py
create mode 100644 knora/dsplib/models/xmlerror.py
create mode 100644 knora/dsplib/models/xmlpermission.py
create mode 100644 knora/dsplib/models/xmlproperty.py
create mode 100644 knora/dsplib/models/xmlresource.py
create mode 100644 knora/dsplib/models/xmlvalue.py
delete mode 100644 test/unittests/test_convert_ark_v0_to_resource_iri.py
create mode 100644 test/unittests/test_xmlupload.py
diff --git a/knora/dsplib/models/projectContext.py b/knora/dsplib/models/projectContext.py
new file mode 100644
index 000000000..c49bc9724
--- /dev/null
+++ b/knora/dsplib/models/projectContext.py
@@ -0,0 +1,50 @@
+from typing import Optional
+
+from knora.dsplib.models.connection import Connection
+from knora.dsplib.models.group import Group
+from knora.dsplib.models.helpers import BaseError
+from knora.dsplib.models.project import Project
+
+
+class ProjectContext:
+ """Represents the project context"""
+
+ _projects: list[Project]
+ _project_map: dict[str, str] # dictionary of (project name:project IRI) pairs
+ _inv_project_map: dict[str, str] # dictionary of (project IRI:project name) pairs
+ _groups: Optional[list[Group]]
+ _group_map: Optional[dict[str, str]]
+ _shortcode: Optional[str]
+ _project_name: Optional[str]
+
+ def __init__(self, con: Connection, shortcode: Optional[str] = None):
+ self._shortcode = shortcode
+ self._projects = Project.getAllProjects(con=con)
+ self._project_map: dict[str, str] = {x.shortname: x.id for x in self._projects}
+ self._inv_project_map: dict[str, str] = {x.id: x.shortname for x in self._projects}
+ try:
+ self._groups = Group.getAllGroups(con=con)
+ except BaseError:
+ self._groups = None
+ if self._groups:
+ self._group_map: dict[str, str] = {self._inv_project_map[x.project] + ':' + x.name: x.id for x in
+ self._groups}
+ else:
+ self._group_map = None
+ self._project_name = None
+ # get the project name from the shortcode
+ if self._shortcode:
+ for p in self._projects:
+ if p.shortcode == self._shortcode:
+ self._project_name = p.shortname
+ break
+
+ @property
+ def group_map(self) -> dict[str, str]:
+ """Dictionary of (project:group name) and (group id) pairs of all groups in project"""
+ return self._group_map
+
+ @property
+ def project_name(self) -> Optional[str]:
+ """Name of the project"""
+ return self._project_name
diff --git a/knora/dsplib/models/xmlallow.py b/knora/dsplib/models/xmlallow.py
new file mode 100644
index 000000000..6fa5e275a
--- /dev/null
+++ b/knora/dsplib/models/xmlallow.py
@@ -0,0 +1,57 @@
+from lxml import etree
+
+from knora.dsplib.models.projectContext import ProjectContext
+from knora.dsplib.models.xmlerror import XmlError
+
+
+class XmlAllow:
+ """Represents the allow element of the XML used for data import"""
+
+ _group: str
+ _permission: str
+
+ def __init__(self, node: etree.Element, project_context: ProjectContext) -> None:
+ """
+ Constructor which parses the XML DOM allow element
+
+ Args:
+ node: The DOM node to be processed (represents a single right in a permission set)
+ project_context: Context for DOM node traversal
+
+ Returns:
+ None
+ """
+ tmp = node.attrib['group'].split(':')
+ sysgroups = ['UnknownUser', 'KnownUser', 'ProjectMember', 'Creator', 'ProjectAdmin', 'SystemAdmin']
+ if len(tmp) > 1:
+ if tmp[0]:
+ if tmp[0] == 'knora-admin' and tmp[1] in sysgroups:
+ self._group = node.attrib['group']
+ else:
+ self._group = project_context.group_map.get(node.attrib['group'])
+ if self._group is None:
+ raise XmlError("Group \"{}\" is not known: Cannot find project!".format(node.attrib['group']))
+ else:
+ if project_context.project_name is None:
+ raise XmlError("Project shortcode has not been set in ProjectContext")
+ self._group = project_context.project_name + ':' + tmp[1]
+ else:
+ if tmp[0] in sysgroups:
+ self._group = 'knora-admin:' + node.attrib['group']
+ else:
+ raise XmlError("Group \"{}\" is not known: ".format(node.attrib['group']))
+ self._permission = node.text
+
+ @property
+ def group(self) -> str:
+ """The group specified in the allow element"""
+ return self._group
+
+ @property
+ def permission(self) -> str:
+ """The reference to a set of permissions"""
+ return self._permission
+
+ def print(self) -> None:
+ """Prints the attributes of the XmlAllow instance"""
+ print(" group=", self._group, " permission=", self._permission)
diff --git a/knora/dsplib/models/xmlbitstream.py b/knora/dsplib/models/xmlbitstream.py
new file mode 100644
index 000000000..fd37fd3e1
--- /dev/null
+++ b/knora/dsplib/models/xmlbitstream.py
@@ -0,0 +1,26 @@
+from lxml import etree
+
+
+class XMLBitstream:
+ """Represents a bitstream object (file) of a resource in the XML used for data import"""
+
+ _value: str
+ _permissions: str
+
+ def __init__(self, node: etree.Element) -> None:
+ self._value = node.text
+ self._permissions = node.get('permissions')
+
+ @property
+ def value(self) -> str:
+ """The file path of the bitstream object"""
+ return self._value
+
+ @property
+ def permissions(self) -> str:
+ """Reference to the set of permissions for the bitstream object"""
+ return self._permissions
+
+ def print(self) -> None:
+ """Prints the bitstream object and its attributes."""
+ print(' Bitstream file path: ' + str(self._value))
diff --git a/knora/dsplib/models/xmlerror.py b/knora/dsplib/models/xmlerror.py
new file mode 100644
index 000000000..a28ced476
--- /dev/null
+++ b/knora/dsplib/models/xmlerror.py
@@ -0,0 +1,9 @@
+class XmlError(Exception):
+ """Represents an error raised in the context of the XML import"""
+ _message: str
+
+ def __init__(self, msg: str):
+ self._message = msg
+
+ def __str__(self) -> str:
+ return 'XML-ERROR: ' + self._message
diff --git a/knora/dsplib/models/xmlpermission.py b/knora/dsplib/models/xmlpermission.py
new file mode 100644
index 000000000..62b5cbe65
--- /dev/null
+++ b/knora/dsplib/models/xmlpermission.py
@@ -0,0 +1,54 @@
+from lxml import etree
+
+from knora.dsplib.models.permission import Permissions
+from knora.dsplib.models.projectContext import ProjectContext
+from knora.dsplib.models.xmlallow import XmlAllow
+
+
+class XmlPermission:
+ """Represents the permission set containing several XmlAllow elements in the XML used for data import"""
+
+ _id: str
+ _allows: list[XmlAllow]
+
+ def __init__(self, node: etree.Element, project_context: ProjectContext) -> None:
+ """
+ Constructor which parses a XML DOM permissions element representing an named permission set
+
+ Args:
+ node: The DOM node to be processed (representing an a permission set)
+ project_context: Context for DOM node traversal
+ """
+ self._allows = []
+ self._id = node.attrib['id']
+ for allow_node in node:
+ self._allows.append(XmlAllow(allow_node, project_context))
+
+ @property
+ def id(self) -> str:
+ """The id of the permission set, p.ex. res-default"""
+ return self._id
+
+ @property
+ def allows(self) -> list[XmlAllow]:
+ """List of XmlAllow elements defining permissions for specific groups"""
+ return self._allows
+
+ def get_permission_instance(self) -> Permissions:
+ """Returns a list of allow elements of this permission instance"""
+ permissions = Permissions()
+ for allow in self._allows:
+ permissions.add(allow.permission, allow.group)
+ return permissions
+
+ def __str__(self) -> str:
+ allow_str: list[str] = []
+ for allow in self._allows:
+ allow_str.append("{} {}".format(allow.permission, allow.group))
+ return '|'.join(allow_str)
+
+ def print(self) -> None:
+ """Prints the permission set"""
+ print('Permission: ', self._id)
+ for a in self._allows:
+ a.print()
diff --git a/knora/dsplib/models/xmlproperty.py b/knora/dsplib/models/xmlproperty.py
new file mode 100644
index 000000000..b22219aa7
--- /dev/null
+++ b/knora/dsplib/models/xmlproperty.py
@@ -0,0 +1,65 @@
+from typing import Optional
+
+from lxml import etree
+
+from knora.dsplib.models.xmlvalue import XMLValue
+from knora.dsplib.models.xmlerror import XmlError
+
+
+class XMLProperty:
+ """Represents a property of a resource in the XML used for data import"""
+
+ _name: str
+ _valtype: str
+ _values: list[XMLValue]
+
+ def __init__(self, node: etree.Element, valtype: str, default_ontology: Optional[str] = None):
+ """
+ The constructor for the DSP property
+
+ Args:
+ node: the property node, p.ex.
+ valtype: the type of value given by the name of the property node, p.ex. decimal in
+ default_ontology: the name of the ontology
+ """
+ # get the property name which is in format namespace:propertyname, p.ex. rosetta:hasName
+ tmp_prop_name = node.attrib['name'].split(':')
+ if len(tmp_prop_name) > 1:
+ if tmp_prop_name[0]:
+ self._name = node.attrib['name']
+ else:
+ # replace an empty namespace with the default ontology name
+ self._name = default_ontology + ':' + tmp_prop_name[1]
+ else:
+ self._name = 'knora-admin:' + tmp_prop_name[0]
+ listname = node.attrib.get('list') # safe the list name if given (only for lists)
+ self._valtype = valtype
+ self._values = []
+
+ # parse the subnodes of the property nodes which contain the actual values of the property
+ for subnode in node:
+ if subnode.tag == valtype: # the subnode must correspond to the expected value type
+ self._values.append(XMLValue(subnode, valtype, listname))
+ else:
+ raise XmlError(f"ERROR Unexpected tag: '{subnode.tag}'. Property may contain only value tags!")
+
+ @property
+ def name(self) -> str:
+ """The name of the property"""
+ return self._name
+
+ @property
+ def valtype(self) -> str:
+ """The value type of the property"""
+ return self._valtype
+
+ @property
+ def values(self) -> list[XMLValue]:
+ """List of values of this property"""
+ return self._values
+
+ def print(self) -> None:
+ """Prints the property."""
+ print(' Property: {} Type: {}'.format(self._name, self._valtype))
+ for value in self._values:
+ value.print()
diff --git a/knora/dsplib/models/xmlresource.py b/knora/dsplib/models/xmlresource.py
new file mode 100644
index 000000000..f68ed1a7c
--- /dev/null
+++ b/knora/dsplib/models/xmlresource.py
@@ -0,0 +1,218 @@
+from typing import Optional, Union
+
+from lxml import etree
+
+from knora.dsplib.models.xmlbitstream import XMLBitstream
+from knora.dsplib.models.helpers import BaseError
+from knora.dsplib.models.permission import Permissions
+from knora.dsplib.models.value import KnoraStandoffXml
+from knora.dsplib.models.xmlproperty import XMLProperty
+
+
+class XMLResource:
+ """Represents a resource in the XML used for data import"""
+
+ _id: str
+ _iri: Optional[str]
+ _ark: Optional[str]
+ _label: str
+ _restype: str
+ _permissions: Optional[str]
+ _bitstream: Optional[XMLBitstream]
+ _properties: list[XMLProperty]
+
+ def __init__(self, node: etree.Element, default_ontology: str) -> None:
+ """
+ Constructor that parses a resource node from the XML DOM
+
+ Args:
+ node: The DOM node to be processed representing a resource (which is a child of the knora element)
+ default_ontology: The default ontology (given in the attribute default-ontology of the knora element)
+
+ Returns:
+ None
+ """
+ self._id = node.attrib['id']
+ self._iri = node.attrib.get('iri')
+ self._ark = node.attrib.get('ark')
+ self._label = node.attrib['label']
+ # get the resource type which is in format namespace:resourcetype, p.ex. rosetta:Image
+ tmp_res_type = node.attrib['restype'].split(':')
+ if len(tmp_res_type) > 1:
+ if tmp_res_type[0]:
+ self._restype = node.attrib['restype']
+ else:
+ # replace an empty namespace with the default ontology name
+ self._restype = default_ontology + ':' + tmp_res_type[1]
+ else:
+ self._restype = 'knora-admin:' + tmp_res_type[0]
+ self._permissions = node.attrib.get("permissions")
+ self._bitstream = None
+ self._properties = []
+ for subnode in node:
+ if subnode.tag is etree.Comment:
+ continue
+ elif subnode.tag == 'bitstream':
+ self._bitstream = XMLBitstream(subnode)
+ else:
+ # get the property type which is in format type-prop, p.ex.
+ prop_type, _ = subnode.tag.split('-')
+ self._properties.append(XMLProperty(subnode, prop_type, default_ontology))
+
+ @property
+ def id(self) -> str:
+ """The unique id of the resource"""
+ return self._id
+
+ @property
+ def iri(self) -> Optional[str]:
+ """The custom IRI of the resource"""
+ return self._iri
+
+ @property
+ def ark(self) -> Optional[str]:
+ """The custom ARK of the resource"""
+ return self._ark
+
+ @property
+ def label(self) -> str:
+ """The label of the resource"""
+ return self._label
+
+ @property
+ def restype(self) -> str:
+ """The type of the resource"""
+ return self._restype
+
+ @property
+ def permissions(self) -> Optional[str]:
+ """The reference to the permissions set for this resource"""
+ return self._permissions
+
+ @property
+ def bitstream(self) -> Optional[XMLBitstream]:
+ """The bitstream object belonging to the resource"""
+ return self._bitstream
+
+ @property
+ def properties(self) -> list[XMLProperty]:
+ return self._properties
+
+ @properties.setter
+ def properties(self, new_properties: list[XMLProperty]) -> None:
+ self._properties = new_properties
+
+ def print(self) -> None:
+ """Prints the resource and its attributes."""
+ print(f'Resource: id={self._id}, restype: {self._restype}, label: {self._label}')
+ if self._bitstream:
+ print(' Bitstream: ' + self._bitstream.value)
+ for prop in self._properties:
+ prop.print()
+
+ def get_props_with_links(self) -> list[XMLProperty]:
+ """
+ Get a list of all XMLProperties that have an outgoing link to another resource, be it a resptr-prop link
+ or a standoff link in a text.
+ """
+ link_properties: list[XMLProperty] = []
+ for prop in self._properties:
+ if prop.valtype == 'resptr':
+ link_properties.append(prop)
+ elif prop.valtype == 'text':
+ for value in prop.values:
+ if value.resrefs:
+ link_properties.append(prop)
+ break
+ return link_properties
+
+ def get_resptrs(self) -> list[str]:
+ """
+ Get a list of all resource id's that are referenced by this resource
+
+ Returns:
+ List of resources identified by their unique id's (as given in the XML)
+ """
+ resptrs: list[str] = []
+ for prop in self._properties:
+ if prop.valtype == 'resptr':
+ for value in prop.values:
+ resptrs.append(str(value.value))
+ elif prop.valtype == 'text':
+ for value in prop.values:
+ if value.resrefs:
+ resptrs.extend(value.resrefs)
+ return resptrs
+
+ def get_propvals(
+ self,
+ resiri_lookup: dict[str, str],
+ permissions_lookup: dict[str, Permissions]
+ ) -> dict[str, Union[list[Union[str, dict[str, str]]], str, dict[str, str]]]:
+ """
+ Get a dictionary of the property names and their values. Replace the internal ids by their IRI first.
+
+ Args:
+ resiri_lookup: Is used to solve internal unique id's of resources to real IRI's
+ permissions_lookup: Is used to resolve the permission id's to permission sets
+
+ Returns:
+ A dict of values with the property name as key and a single value. This dict represents the JSON structure
+ that Knora.create_resource() expects.
+ """
+ prop_data = {}
+ for prop in self._properties:
+ vals: list[Union[str, dict[str, str]]] = []
+ for value in prop.values:
+ if prop.valtype == 'resptr': # we have a resptr, therefore simple lookup or IRI
+ iri = resiri_lookup.get(value.value)
+ if iri:
+ v = iri
+ else:
+ v = value.value # if we do not find the id, we assume it's a valid DSP IRI
+ elif prop.valtype == 'text':
+ if isinstance(value.value, KnoraStandoffXml):
+ iri_refs = value.value.get_all_iris()
+ for iri_ref in iri_refs:
+ res_id = iri_ref.split(':')[1]
+ iri = resiri_lookup.get(res_id)
+ if not iri:
+ raise BaseError(f'Resource cannot be created, because it contains a salsah-Link to '
+ f'the following invalid resource: {res_id}.')
+ value.value.replace(iri_ref, iri)
+ v = value.value
+ else:
+ v = value.value
+
+ if value.comment is None and value.permissions is None:
+ # no comment or permissions
+ vals.append(v)
+ else:
+ # we have comment or permissions
+ tmp = {'value': v}
+ if value.comment:
+ tmp['comment'] = value.comment
+ if value.permissions:
+ tmp['permissions'] = permissions_lookup.get(value.permissions)
+ vals.append(tmp)
+ prop_data[prop.name] = vals if len(vals) > 1 else vals[0]
+ return prop_data
+
+ def get_bitstream(self, internal_file_name_bitstream: str, permissions_lookup: dict[str, Permissions]) -> Optional[dict[str, Union[str, Permissions]]]:
+ """
+ Get the bitstream object belonging to the resource
+
+ Args:
+ internal_file_name_bitstream: Internal file name of bitstream object as returned from Sipi
+ permissions_lookup: Is used to resolve the permission id's to permission sets
+
+ Returns:
+ A dict of the bitstream object
+ """
+ tmp = None
+ if self._bitstream:
+ bitstream = self._bitstream
+ tmp = {'value': bitstream.value, 'internal_file_name': internal_file_name_bitstream}
+ if bitstream.permissions:
+ tmp['permissions'] = permissions_lookup.get(bitstream.permissions)
+ return tmp
diff --git a/knora/dsplib/models/xmlvalue.py b/knora/dsplib/models/xmlvalue.py
new file mode 100644
index 000000000..de425ef24
--- /dev/null
+++ b/knora/dsplib/models/xmlvalue.py
@@ -0,0 +1,80 @@
+from typing import Union, Optional
+
+from lxml import etree
+
+from knora.dsplib.models.value import KnoraStandoffXml
+
+
+class XMLValue:
+ """Represents a value of a resource property in the XML used for data import"""
+
+ _value: Union[str, KnoraStandoffXml]
+ _resrefs: Optional[list[str]]
+ _comment: str
+ _permissions: str
+ _is_richtext: bool
+
+ def __init__(self, node: etree.Element, val_type: str, listname: Optional[str] = None) -> None:
+
+ self._resrefs = None
+ self._comment = node.get('comment')
+ self._permissions = node.get('permissions')
+ if node.get('encoding') == 'xml':
+ node.attrib.clear()
+ xmlstr = etree.tostring(node, encoding="unicode", method="xml")
+ xmlstr = xmlstr.replace('', '')
+ xmlstr = xmlstr.replace('', '')
+ self._value = KnoraStandoffXml(xmlstr)
+ tmp_id_list = self._value.get_all_iris()
+ if tmp_id_list:
+ refs = set()
+ for tmp_id in tmp_id_list:
+ refs.add(tmp_id.split(':')[1])
+ self._resrefs = list(refs)
+ else:
+ if val_type == 'list':
+ self._value = listname + ':' + "".join(node.itertext())
+ else:
+ self._value = "".join(node.itertext())
+
+ @property
+ def value(self) -> Union[str, KnoraStandoffXml]:
+ """The actual value of the value instance"""
+ return self._value
+
+ @value.setter
+ def value(self, value: Union[str, KnoraStandoffXml]) -> None:
+ self._value = value
+
+ @property
+ def resrefs(self) -> Optional[list[str]]:
+ """List of resource references"""
+ return self._resrefs
+
+ @resrefs.setter
+ def resrefs(self, resrefs: Optional[list[str]]) -> None:
+ self._resrefs = resrefs
+
+ @property
+ def comment(self) -> str:
+ """Comment about the value"""
+ return self._comment
+
+ @property
+ def permissions(self) -> str:
+ """Reference to the set of permissions for the value"""
+ return self._permissions
+
+ @property
+ def is_richtext(self) -> bool:
+ """true if text value is of type richtext, false otherwise"""
+ return self._is_richtext
+
+ def print(self) -> None:
+ """Prints the value and its attributes."""
+ print(' Value: ' + str(self._value))
+ if self._comment:
+ print(' Comment:' + self._comment)
+ if self._resrefs is not None:
+ for i in self._resrefs:
+ print(' res_ref: ' + i)
diff --git a/knora/dsplib/utils/xml_upload.py b/knora/dsplib/utils/xml_upload.py
index bc3c8b688..a00b19423 100644
--- a/knora/dsplib/utils/xml_upload.py
+++ b/knora/dsplib/utils/xml_upload.py
@@ -14,542 +14,19 @@
from lxml import etree
from requests import RequestException
+from knora.dsplib.models.projectContext import ProjectContext
from knora.dsplib.models.connection import Connection
-from knora.dsplib.models.group import Group
from knora.dsplib.models.helpers import BaseError
from knora.dsplib.models.permission import Permissions
-from knora.dsplib.models.project import Project
from knora.dsplib.models.resource import ResourceInstanceFactory, ResourceInstance, KnoraStandoffXmlEncoder
from knora.dsplib.models.sipi import Sipi
from knora.dsplib.models.value import KnoraStandoffXml
+from knora.dsplib.models.xmlpermission import XmlPermission
+from knora.dsplib.models.xmlproperty import XMLProperty
+from knora.dsplib.models.xmlresource import XMLResource
-class XmlError(BaseException):
- """Represents an error raised in the context of the XML import"""
- _message: str
-
- def __init__(self, msg: str):
- self._message = msg
-
- def __str__(self) -> str:
- return 'XML-ERROR: ' + self._message
-
-
-class ProjectContext:
- """Represents the project context"""
-
- _projects: list[Project]
- _project_map: dict[str, str] # dictionary of (project name:project IRI) pairs
- _inv_project_map: dict[str, str] # dictionary of (project IRI:project name) pairs
- _groups: Optional[list[Group]]
- _group_map: Optional[dict[str, str]]
- _shortcode: Optional[str]
- _project_name: Optional[str]
-
- def __init__(self, con: Connection, shortcode: Optional[str] = None):
- self._shortcode = shortcode
- self._projects = Project.getAllProjects(con=con)
- self._project_map: dict[str, str] = {x.shortname: x.id for x in self._projects}
- self._inv_project_map: dict[str, str] = {x.id: x.shortname for x in self._projects}
- try:
- self._groups = Group.getAllGroups(con=con)
- except BaseError:
- self._groups = None
- if self._groups:
- self._group_map: dict[str, str] = {self._inv_project_map[x.project] + ':' + x.name: x.id for x in
- self._groups}
- else:
- self._group_map = None
- self._project_name = None
- # get the project name from the shortcode
- if self._shortcode:
- for p in self._projects:
- if p.shortcode == self._shortcode:
- self._project_name = p.shortname
- break
-
- @property
- def group_map(self) -> dict[str, str]:
- """Dictionary of (project:group name) and (group id) pairs of all groups in project"""
- return self._group_map
-
- @property
- def project_name(self) -> Optional[str]:
- """Name of the project"""
- return self._project_name
-
-
-class XMLBitstream:
- """Represents a bitstream object (file) of a resource in the XML used for data import"""
-
- _value: str
- _permissions: str
-
- def __init__(self, node: etree.Element) -> None:
- self._value = node.text
- self._permissions = node.get('permissions')
-
- @property
- def value(self) -> str:
- """The file path of the bitstream object"""
- return self._value
-
- @property
- def permissions(self) -> str:
- """Reference to the set of permissions for the bitstream object"""
- return self._permissions
-
- def print(self) -> None:
- """Prints the bitstream object and its attributes."""
- print(' Bitstream file path: ' + str(self._value))
-
-
-class XMLValue:
- """Represents a value of a resource property in the XML used for data import"""
-
- _value: Union[str, KnoraStandoffXml]
- _resrefs: Optional[list[str]]
- _comment: str
- _permissions: str
- _is_richtext: bool
-
- def __init__(self, node: etree.Element, val_type: str, listname: Optional[str] = None) -> None:
-
- self._resrefs = None
- self._comment = node.get('comment')
- self._permissions = node.get('permissions')
- if node.get('encoding') == 'xml':
- node.attrib.clear()
- xmlstr = etree.tostring(node, encoding="unicode", method="xml")
- xmlstr = xmlstr.replace('', '')
- xmlstr = xmlstr.replace('', '')
- self._value = KnoraStandoffXml(xmlstr)
- tmp_id_list = self._value.get_all_iris()
- if tmp_id_list:
- refs = set()
- for tmp_id in tmp_id_list:
- refs.add(tmp_id.split(':')[1])
- self._resrefs = list(refs)
- else:
- if val_type == 'list':
- self._value = listname + ':' + "".join(node.itertext())
- else:
- self._value = "".join(node.itertext())
-
- @property
- def value(self) -> Union[str, KnoraStandoffXml]:
- """The actual value of the value instance"""
- return self._value
-
- @value.setter
- def value(self, value: Union[str, KnoraStandoffXml]) -> None:
- self._value = value
-
- @property
- def resrefs(self) -> Optional[list[str]]:
- """List of resource references"""
- return self._resrefs
-
- @resrefs.setter
- def resrefs(self, resrefs: Optional[list[str]]) -> None:
- self._resrefs = resrefs
-
- @property
- def comment(self) -> str:
- """Comment about the value"""
- return self._comment
-
- @property
- def permissions(self) -> str:
- """Reference to the set of permissions for the value"""
- return self._permissions
-
- @property
- def is_richtext(self) -> bool:
- """true if text value is of type richtext, false otherwise"""
- return self._is_richtext
-
- def print(self) -> None:
- """Prints the value and its attributes."""
- print(' Value: ' + str(self._value))
- if self._comment:
- print(' Comment:' + self._comment)
- if self._resrefs is not None:
- for i in self._resrefs:
- print(' res_ref: ' + i)
-
-
-class XMLProperty:
- """Represents a property of a resource in the XML used for data import"""
-
- _name: str
- _valtype: str
- _values: list[XMLValue]
-
- def __init__(self, node: etree.Element, valtype: str, default_ontology: Optional[str] = None):
- """
- The constructor for the DSP property
-
- Args:
- node: the property node, p.ex.
- valtype: the type of value given by the name of the property node, p.ex. decimal in
- default_ontology: the name of the ontology
- """
- # get the property name which is in format namespace:propertyname, p.ex. rosetta:hasName
- tmp_prop_name = node.attrib['name'].split(':')
- if len(tmp_prop_name) > 1:
- if tmp_prop_name[0]:
- self._name = node.attrib['name']
- else:
- # replace an empty namespace with the default ontology name
- self._name = default_ontology + ':' + tmp_prop_name[1]
- else:
- self._name = 'knora-admin:' + tmp_prop_name[0]
- listname = node.attrib.get('list') # safe the list name if given (only for lists)
- self._valtype = valtype
- self._values = []
-
- # parse the subnodes of the property nodes which contain the actual values of the property
- for subnode in node:
- if subnode.tag == valtype: # the subnode must correspond to the expected value type
- self._values.append(XMLValue(subnode, valtype, listname))
- else:
- raise XmlError(f"ERROR Unexpected tag: '{subnode.tag}'. Property may contain only value tags!")
-
- @property
- def name(self) -> str:
- """The name of the property"""
- return self._name
-
- @property
- def valtype(self) -> str:
- """The value type of the property"""
- return self._valtype
-
- @property
- def values(self) -> list[XMLValue]:
- """List of values of this property"""
- return self._values
-
- def print(self) -> None:
- """Prints the property."""
- print(' Property: {} Type: {}'.format(self._name, self._valtype))
- for value in self._values:
- value.print()
-
-
-class XMLResource:
- """Represents a resource in the XML used for data import"""
-
- _id: str
- _iri: Optional[str]
- _ark: Optional[str]
- _label: str
- _restype: str
- _permissions: Optional[str]
- _bitstream: Optional[XMLBitstream]
- _properties: list[XMLProperty]
-
- def __init__(self, node: etree.Element, default_ontology: Optional[str] = None) -> None:
- """
- Constructor that parses a resource node from the XML DOM
-
- Args:
- node: The DOM node to be processed representing a resource (which is a child of the knora element)
- default_ontology: The default ontology (given in the attribute default-ontology of the knora element)
-
- Returns:
- None
- """
- self._id = node.attrib['id']
- self._iri = node.attrib.get('iri')
- self._ark = node.attrib.get('ark')
- self._label = node.attrib['label']
- # get the resource type which is in format namespace:resourcetype, p.ex. rosetta:Image
- tmp_res_type = node.attrib['restype'].split(':')
- if len(tmp_res_type) > 1:
- if tmp_res_type[0]:
- self._restype = node.attrib['restype']
- else:
- # replace an empty namespace with the default ontology name
- self._restype = default_ontology + ':' + tmp_res_type[1]
- else:
- self._restype = 'knora-admin:' + tmp_res_type[0]
- self._permissions = node.attrib.get("permissions")
- self._bitstream = None
- self._properties = []
- for subnode in node:
- if subnode.tag is etree.Comment:
- continue
- elif subnode.tag == 'bitstream':
- self._bitstream = XMLBitstream(subnode)
- else:
- # get the property type which is in format type-prop, p.ex.
- prop_type, _ = subnode.tag.split('-')
- self._properties.append(XMLProperty(subnode, prop_type, default_ontology))
-
- @property
- def id(self) -> str:
- """The unique id of the resource"""
- return self._id
-
- @property
- def iri(self) -> Optional[str]:
- """The custom IRI of the resource"""
- return self._iri
-
- @property
- def ark(self) -> Optional[str]:
- """The custom ARK of the resource"""
- return self._ark
-
- @property
- def label(self) -> str:
- """The label of the resource"""
- return self._label
-
- @property
- def restype(self) -> str:
- """The type of the resource"""
- return self._restype
-
- @property
- def permissions(self) -> Optional[str]:
- """The reference to the permissions set for this resource"""
- return self._permissions
-
- @property
- def bitstream(self) -> Optional[XMLBitstream]:
- """The bitstream object belonging to the resource"""
- return self._bitstream
-
- @property
- def properties(self) -> list[XMLProperty]:
- return self._properties
-
- @properties.setter
- def properties(self, new_properties: list[XMLProperty]) -> None:
- self._properties = new_properties
-
- def print(self) -> None:
- """Prints the resource and its attributes."""
- print(f'Resource: id={self._id}, restype: {self._restype}, label: {self._label}')
- if self._bitstream:
- print(' Bitstream: ' + self._bitstream.value)
- for prop in self._properties:
- prop.print()
-
- def get_props_with_links(self) -> list[XMLProperty]:
- """
- Get a list of all XMLProperties that have an outgoing link to another resource, be it a resptr-prop link
- or a standoff link in a text.
- """
- link_properties: list[XMLProperty] = []
- for prop in self._properties:
- if prop.valtype == 'resptr':
- link_properties.append(prop)
- elif prop.valtype == 'text':
- for value in prop.values:
- if value.resrefs:
- link_properties.append(prop)
- break
- return link_properties
-
- def get_resptrs(self) -> list[str]:
- """
- Get a list of all resource id's that are referenced by this resource
-
- Returns:
- List of resources identified by their unique id's (as given in the XML)
- """
- resptrs: list[str] = []
- for prop in self._properties:
- if prop.valtype == 'resptr':
- for value in prop.values:
- resptrs.append(str(value.value))
- elif prop.valtype == 'text':
- for value in prop.values:
- if value.resrefs:
- resptrs.extend(value.resrefs)
- return resptrs
-
- def get_propvals(
- self,
- resiri_lookup: dict[str, str],
- permissions_lookup: dict[str, Permissions]
- ) -> dict[str, Union[list[Union[str, dict[str, str]]], str, dict[str, str]]]:
- """
- Get a dictionary of the property names and their values. Replace the internal ids by their IRI first.
-
- Args:
- resiri_lookup: Is used to solve internal unique id's of resources to real IRI's
- permissions_lookup: Is used to resolve the permission id's to permission sets
-
- Returns:
- A dict of values with the property name as key and a single value. This dict represents the JSON structure
- that Knora.create_resource() expects.
- """
- prop_data = {}
- for prop in self._properties:
- vals: list[Union[str, dict[str, str]]] = []
- for value in prop.values:
- if prop.valtype == 'resptr': # we have a resptr, therefore simple lookup or IRI
- iri = resiri_lookup.get(value.value)
- if iri:
- v = iri
- else:
- v = value.value # if we do not find the id, we assume it's a valid DSP IRI
- elif prop.valtype == 'text':
- if isinstance(value.value, KnoraStandoffXml):
- iri_refs = value.value.get_all_iris()
- for iri_ref in iri_refs:
- res_id = iri_ref.split(':')[1]
- iri = resiri_lookup.get(res_id)
- if not iri:
- raise BaseError(f'Resource cannot be created, because it contains a salsah-Link to '
- f'the following invalid resource: {res_id}.')
- value.value.replace(iri_ref, iri)
- v = value.value
- else:
- v = value.value
-
- if value.comment is None and value.permissions is None:
- # no comment or permissions
- vals.append(v)
- else:
- # we have comment or permissions
- tmp = {'value': v}
- if value.comment:
- tmp['comment'] = value.comment
- if value.permissions:
- tmp['permissions'] = permissions_lookup.get(value.permissions)
- vals.append(tmp)
- prop_data[prop.name] = vals if len(vals) > 1 else vals[0]
- return prop_data
-
- def get_bitstream(self, internal_file_name_bitstream: str, permissions_lookup: dict[str, Permissions]) -> Optional[dict[str, Union[str, Permissions]]]:
- """
- Get the bitstream object belonging to the resource
-
- Args:
- internal_file_name_bitstream: Internal file name of bitstream object as returned from Sipi
- permissions_lookup: Is used to resolve the permission id's to permission sets
-
- Returns:
- A dict of the bitstream object
- """
- tmp = None
- if self._bitstream:
- bitstream = self._bitstream
- tmp = {'value': bitstream.value, 'internal_file_name': internal_file_name_bitstream}
- if bitstream.permissions:
- tmp['permissions'] = permissions_lookup.get(bitstream.permissions)
- return tmp
-
-
-class XmlAllow:
- """Represents the allow element of the XML used for data import"""
-
- _group: str
- _permission: str
-
- def __init__(self, node: etree.Element, project_context: ProjectContext) -> None:
- """
- Constructor which parses the XML DOM allow element
-
- Args:
- node: The DOM node to be processed (represents a single right in a permission set)
- project_context: Context for DOM node traversal
-
- Returns:
- None
- """
- tmp = node.attrib['group'].split(':')
- sysgroups = ['UnknownUser', 'KnownUser', 'ProjectMember', 'Creator', 'ProjectAdmin', 'SystemAdmin']
- if len(tmp) > 1:
- if tmp[0]:
- if tmp[0] == 'knora-admin' and tmp[1] in sysgroups:
- self._group = node.attrib['group']
- else:
- self._group = project_context.group_map.get(node.attrib['group'])
- if self._group is None:
- raise XmlError("Group \"{}\" is not known: Cannot find project!".format(node.attrib['group']))
- else:
- if project_context.project_name is None:
- raise XmlError("Project shortcode has not been set in ProjectContext")
- self._group = project_context.project_name + ':' + tmp[1]
- else:
- if tmp[0] in sysgroups:
- self._group = 'knora-admin:' + node.attrib['group']
- else:
- raise XmlError("Group \"{}\" is not known: ".format(node.attrib['group']))
- self._permission = node.text
-
- @property
- def group(self) -> str:
- """The group specified in the allow element"""
- return self._group
-
- @property
- def permission(self) -> str:
- """The reference to a set of permissions"""
- return self._permission
-
- def print(self) -> None:
- """Prints the attributes of the XmlAllow instance"""
- print(" group=", self._group, " permission=", self._permission)
-
-
-class XmlPermission:
- """Represents the permission set containing several XmlAllow elements in the XML used for data import"""
-
- _id: str
- _allows: list[XmlAllow]
-
- def __init__(self, node: etree.Element, project_context: ProjectContext) -> None:
- """
- Constructor which parses a XML DOM permissions element representing an named permission set
-
- Args:
- node: The DOM node to be processed (representing an a permission set)
- project_context: Context for DOM node traversal
- """
- self._allows = []
- self._id = node.attrib['id']
- for allow_node in node:
- self._allows.append(XmlAllow(allow_node, project_context))
-
- @property
- def id(self) -> str:
- """The id of the permission set, p.ex. res-default"""
- return self._id
-
- @property
- def allows(self) -> list[XmlAllow]:
- """List of XmlAllow elements defining permissions for specific groups"""
- return self._allows
-
- def get_permission_instance(self) -> Permissions:
- """Returns a list of allow elements of this permission instance"""
- permissions = Permissions()
- for allow in self._allows:
- permissions.add(allow.permission, allow.group)
- return permissions
-
- def __str__(self) -> str:
- allow_str: list[str] = []
- for allow in self._allows:
- allow_str.append("{} {}".format(allow.permission, allow.group))
- return '|'.join(allow_str)
-
- def print(self) -> None:
- """Prints the permission set"""
- print('Permission: ', self._id)
- for a in self._allows:
- a.print()
-
-
-def remove_circular_references(resources: list[XMLResource], verbose: bool) -> \
+def _remove_circular_references(resources: list[XMLResource], verbose: bool) -> \
tuple[list[XMLResource],
dict[XMLResource, dict[XMLProperty, dict[str, KnoraStandoffXml]]],
dict[XMLResource, dict[XMLProperty, list[str]]]
@@ -599,7 +76,7 @@ def remove_circular_references(resources: list[XMLResource], verbose: bool) -> \
resources = nok_resources
if len(nok_resources) == nok_len:
# there are circular references. go through all problematic resources, and stash the problematic references.
- nok_resources, ok_res_ids, ok_resources, stashed_xml_texts, stashed_resptr_props = stash_circular_references(
+ nok_resources, ok_res_ids, ok_resources, stashed_xml_texts, stashed_resptr_props = _stash_circular_references(
nok_resources,
ok_res_ids,
ok_resources,
@@ -614,7 +91,7 @@ def remove_circular_references(resources: list[XMLResource], verbose: bool) -> \
return ok_resources, stashed_xml_texts, stashed_resptr_props
-def stash_circular_references(
+def _stash_circular_references(
nok_resources: list[XMLResource],
ok_res_ids: list[str],
ok_resources: list[XMLResource],
@@ -671,7 +148,7 @@ def stash_circular_references(
return nok_resources, ok_res_ids, ok_resources, stashed_xml_texts, stashed_resptr_props
-def validate_xml_against_schema(input_file: str, schema_file: str) -> bool:
+def _validate_xml_against_schema(input_file: str, schema_file: str) -> bool:
"""
Validates an XML file against an XSD schema
@@ -694,7 +171,7 @@ def validate_xml_against_schema(input_file: str, schema_file: str) -> bool:
return False
-def convert_ark_v0_to_resource_iri(ark: str) -> str:
+def _convert_ark_v0_to_resource_iri(ark: str) -> str:
"""
Converts an ARK URL from salsah.org (ARK version 0) of the form ark:/72163/080c-779b9990a0c3f-6e to a DSP resource
IRI of the form http://rdfh.ch/080C/Ef9heHjPWDS7dMR_gGax2Q
@@ -758,7 +235,7 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s
# Validate the input XML file
current_dir = os.path.dirname(os.path.realpath(__file__))
schema_file = os.path.join(current_dir, '../schemas/data.xsd')
- if validate_xml_against_schema(input_file, schema_file):
+ if _validate_xml_against_schema(input_file, schema_file):
print("The input data file is syntactically correct and passed validation.")
if validate_only:
exit(0)
@@ -799,7 +276,7 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s
# temporarily remove circular references, but only if not an incremental upload
if not incremental:
- resources, stashed_xml_texts, stashed_resptr_props = remove_circular_references(resources, verbose)
+ resources, stashed_xml_texts, stashed_resptr_props = _remove_circular_references(resources, verbose)
else:
stashed_xml_texts = dict()
stashed_resptr_props = dict()
@@ -808,36 +285,36 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s
failed_uploads: list[str] = []
try:
- id2iri_mapping, failed_uploads = upload_resources(verbose, resources, imgdir, sipi_server, permissions_lookup,
- resclass_name_2_type, id2iri_mapping, con, failed_uploads)
+ id2iri_mapping, failed_uploads = _upload_resources(verbose, resources, imgdir, sipi_server, permissions_lookup,
+ resclass_name_2_type, id2iri_mapping, con, failed_uploads)
except BaseException as err:
- handle_upload_error(err, input_file, id2iri_mapping, failed_uploads, stashed_xml_texts, stashed_resptr_props)
+ _handle_upload_error(err, input_file, id2iri_mapping, failed_uploads, stashed_xml_texts, stashed_resptr_props)
# update the resources with the stashed XML texts
nonapplied_xml_texts = {}
if len(stashed_xml_texts) > 0:
try:
- nonapplied_xml_texts = upload_stashed_xml_texts(verbose, id2iri_mapping, con, stashed_xml_texts)
+ nonapplied_xml_texts = _upload_stashed_xml_texts(verbose, id2iri_mapping, con, stashed_xml_texts)
except BaseException as err:
- handle_upload_error(err, input_file, id2iri_mapping, failed_uploads, stashed_xml_texts, stashed_resptr_props)
+ _handle_upload_error(err, input_file, id2iri_mapping, failed_uploads, stashed_xml_texts, stashed_resptr_props)
# update the resources with the stashed resptrs
nonapplied_resptr_props = {}
if len(stashed_resptr_props) > 0:
try:
- nonapplied_resptr_props = upload_stashed_resptr_props(verbose, id2iri_mapping, con, stashed_resptr_props)
+ nonapplied_resptr_props = _upload_stashed_resptr_props(verbose, id2iri_mapping, con, stashed_resptr_props)
except BaseException as err:
- handle_upload_error(err, input_file, id2iri_mapping, failed_uploads, stashed_xml_texts, stashed_resptr_props)
+ _handle_upload_error(err, input_file, id2iri_mapping, failed_uploads, stashed_xml_texts, stashed_resptr_props)
# write log files
success = True
timestamp_str = datetime.now().strftime("%Y%m%d-%H%M%S")
- write_id2iri_mapping(input_file, id2iri_mapping, timestamp_str)
+ _write_id2iri_mapping(input_file, id2iri_mapping, timestamp_str)
if len(nonapplied_xml_texts) > 0:
- write_stashed_xml_texts(nonapplied_xml_texts, timestamp_str)
+ _write_stashed_xml_texts(nonapplied_xml_texts, timestamp_str)
success = False
if len(nonapplied_resptr_props) > 0:
- write_stashed_resptr_props(nonapplied_resptr_props, timestamp_str)
+ _write_stashed_resptr_props(nonapplied_resptr_props, timestamp_str)
success = False
if failed_uploads:
print(f"Could not upload the following resources: {failed_uploads}")
@@ -846,7 +323,7 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s
return success
-def upload_resources(
+def _upload_resources(
verbose: bool,
resources: list[XMLResource],
imgdir: str,
@@ -881,12 +358,12 @@ def upload_resources(
resource_iri = resource.iri
if resource.ark:
- resource_iri = convert_ark_v0_to_resource_iri(resource.ark)
+ resource_iri = _convert_ark_v0_to_resource_iri(resource.ark)
# in case of a multimedia resource: upload the multimedia file
resource_bitstream = None
if resource.bitstream:
- img: Optional[dict[Any, Any]] = try_network_action(
+ img: Optional[dict[Any, Any]] = _try_network_action(
object=sipi_server,
method='upload_bitstream',
kwargs={'filepath': os.path.join(imgdir, resource.bitstream.value)},
@@ -901,7 +378,7 @@ def upload_resources(
# create the resource in DSP
resclass_type = resclass_name_2_type[resource.restype]
properties = resource.get_propvals(id2iri_mapping, permissions_lookup)
- resclass_instance: ResourceInstance = try_network_action(
+ resclass_instance: ResourceInstance = _try_network_action(
method=resclass_type,
kwargs={
'con': con,
@@ -917,7 +394,7 @@ def upload_resources(
failed_uploads.append(resource.id)
continue
- created_resource: ResourceInstance = try_network_action(
+ created_resource: ResourceInstance = _try_network_action(
object=resclass_instance,
method='create',
terminal_output_on_failure=f"ERROR while trying to create resource '{resource.label}' ({resource.id})."
@@ -931,7 +408,7 @@ def upload_resources(
return id2iri_mapping, failed_uploads
-def upload_stashed_xml_texts(
+def _upload_stashed_xml_texts(
verbose: bool,
id2iri_mapping: dict[str, str],
con: Connection,
@@ -957,7 +434,7 @@ def upload_stashed_xml_texts(
continue
print(f' Upload XML text(s) of resource "{resource.id}"...')
res_iri = id2iri_mapping[resource.id]
- existing_resource = try_network_action(
+ existing_resource = _try_network_action(
object=con,
method='get',
kwargs={'path': f'/v2/resources/{quote_plus(res_iri)}'},
@@ -978,7 +455,7 @@ def upload_stashed_xml_texts(
pure_text = re.sub(r'(<\?xml.+>\s*)?\s*(.+)\s*<\/text>', r'\2', old_xmltext)
# if the pure text is a hash, the replacement must be made. This hash originates from
- # stash_circular_references(), and identifies the XML texts
+ # _stash_circular_references(), and identifies the XML texts
if pure_text not in hash_to_value:
continue
new_xmltext = hash_to_value[pure_text]
@@ -1004,7 +481,7 @@ def upload_stashed_xml_texts(
jsondata = json.dumps(jsonobj, indent=4, separators=(',', ': '), cls=KnoraStandoffXmlEncoder)
# execute API call
- response = try_network_action(
+ response = _try_network_action(
object=con,
method='put',
kwargs={'path': '/v2/values', 'jsondata': jsondata},
@@ -1018,11 +495,11 @@ def upload_stashed_xml_texts(
print(f' Successfully uploaded xml text of "{link_prop.name}"\n')
# make a purged version of stashed_xml_texts, without empty entries
- nonapplied_xml_texts = purge_stashed_xml_texts(stashed_xml_texts)
+ nonapplied_xml_texts = _purge_stashed_xml_texts(stashed_xml_texts)
return nonapplied_xml_texts
-def purge_stashed_xml_texts(
+def _purge_stashed_xml_texts(
stashed_xml_texts: dict[XMLResource, dict[XMLProperty, dict[str, KnoraStandoffXml]]]
) -> dict[XMLResource, dict[XMLProperty, dict[str, KnoraStandoffXml]]]:
nonapplied_xml_texts: dict[XMLResource, dict[XMLProperty, dict[str, KnoraStandoffXml]]] = {}
@@ -1035,7 +512,7 @@ def purge_stashed_xml_texts(
return nonapplied_xml_texts
-def upload_stashed_resptr_props(
+def _upload_stashed_resptr_props(
verbose: bool,
id2iri_mapping: dict[str, str],
con: Connection,
@@ -1076,7 +553,7 @@ def upload_stashed_resptr_props(
'@context': existing_resource['@context']
}
jsondata = json.dumps(jsonobj, indent=4, separators=(',', ': '))
- response = try_network_action(
+ response = _try_network_action(
object=con,
method='post',
kwargs={'path': '/v2/values', 'jsondata': jsondata},
@@ -1091,11 +568,11 @@ def upload_stashed_resptr_props(
f' Value: {resptr}')
# make a purged version of stashed_resptr_props, without empty entries
- nonapplied_resptr_props = purge_stashed_resptr_props(stashed_resptr_props)
+ nonapplied_resptr_props = _purge_stashed_resptr_props(stashed_resptr_props)
return nonapplied_resptr_props
-def try_network_action(
+def _try_network_action(
terminal_output_on_failure: str,
method: Union[str, Callable[..., Any]],
object: Optional[Any] = None,
@@ -1148,7 +625,7 @@ def try_network_action(
return None
-def purge_stashed_resptr_props(
+def _purge_stashed_resptr_props(
stashed_resptr_props: dict[XMLResource, dict[XMLProperty, list[str]]]
) -> dict[XMLResource, dict[XMLProperty, list[str]]]:
nonapplied_resptr_props: dict[XMLResource, dict[XMLProperty, list[str]]] = {}
@@ -1161,7 +638,7 @@ def purge_stashed_resptr_props(
return nonapplied_resptr_props
-def handle_upload_error(
+def _handle_upload_error(
err: BaseException,
input_file: str,
id2iri_mapping: dict[str, str],
@@ -1192,19 +669,19 @@ def handle_upload_error(
timestamp_str = datetime.now().strftime("%Y%m%d-%H%M%S")
# write id2iri_mapping of the resources that are already in DSP
- write_id2iri_mapping(input_file, id2iri_mapping, timestamp_str)
+ _write_id2iri_mapping(input_file, id2iri_mapping, timestamp_str)
# Both stashes are purged from resources that have not been uploaded yet. Only stashed properties of resources that
# already exist in DSP are of interest.
- stashed_xml_texts_purged = purge_stashed_xml_texts(
+ stashed_xml_texts_purged = _purge_stashed_xml_texts(
{res: propdict for res, propdict in stashed_xml_texts.items() if res.id in id2iri_mapping})
if len(stashed_xml_texts_purged) > 0:
- write_stashed_xml_texts(stashed_xml_texts_purged, timestamp_str)
+ _write_stashed_xml_texts(stashed_xml_texts_purged, timestamp_str)
- stashed_resptr_props_purged = purge_stashed_resptr_props(
+ stashed_resptr_props_purged = _purge_stashed_resptr_props(
{res: propdict for res, propdict in stashed_resptr_props.items() if res.id in id2iri_mapping})
if len(stashed_resptr_props_purged) > 0:
- write_stashed_resptr_props(stashed_resptr_props_purged, timestamp_str)
+ _write_stashed_resptr_props(stashed_resptr_props_purged, timestamp_str)
# print the resources that threw an error when they were tried to be uploaded
if failed_uploads:
@@ -1219,7 +696,7 @@ def handle_upload_error(
raise err
-def write_id2iri_mapping(input_file: str, id2iri_mapping: dict[str, str], timestamp_str: str) -> None:
+def _write_id2iri_mapping(input_file: str, id2iri_mapping: dict[str, str], timestamp_str: str) -> None:
"""
Write the id2iri mapping into a file. The timestamp must be created by the caller, so that different log files can
have an identical timestamp.
@@ -1239,7 +716,7 @@ def write_id2iri_mapping(input_file: str, id2iri_mapping: dict[str, str], timest
outfile.write(json.dumps(id2iri_mapping))
-def write_stashed_xml_texts(
+def _write_stashed_xml_texts(
stashed_xml_texts: dict[XMLResource, dict[XMLProperty, dict[str, KnoraStandoffXml]]],
timestamp_str: str
) -> None:
@@ -1281,7 +758,7 @@ def write_stashed_xml_texts(
f.write(f'\ntext with hash {hash}:\n{str(standoff).strip()}\n')
-def write_stashed_resptr_props(
+def _write_stashed_resptr_props(
stashed_resptr_props: dict[XMLResource, dict[XMLProperty, list[str]]],
timestamp_str: str
) -> None:
diff --git a/test/unittests/test_convert_ark_v0_to_resource_iri.py b/test/unittests/test_convert_ark_v0_to_resource_iri.py
deleted file mode 100644
index 804219446..000000000
--- a/test/unittests/test_convert_ark_v0_to_resource_iri.py
+++ /dev/null
@@ -1,34 +0,0 @@
-"""Unit tests for ARK v0 conversion"""
-
-import unittest
-
-from knora.dsplib.models.helpers import BaseError
-from knora.dsplib.utils.xml_upload import convert_ark_v0_to_resource_iri
-
-
-class TestARKV02IRI(unittest.TestCase):
-
- def test_convert_ark_v0_to_resource_iri(self):
- ark = "ark:/72163/080c-779b9990a0c3f-6e"
- iri = convert_ark_v0_to_resource_iri(ark)
- self.assertEqual("http://rdfh.ch/080C/Ef9heHjPWDS7dMR_gGax2Q", iri)
-
- with self.assertRaises(BaseError) as err1:
- convert_ark_v0_to_resource_iri("ark:/72163/080c-779b999-0a0c3f-6e")
- self.assertEqual(err1.exception.message, "while converting ARK 'ark:/72163/080c-779b999-0a0c3f-6e'. The ARK seems to be invalid")
-
- with self.assertRaises(BaseError) as err2:
- convert_ark_v0_to_resource_iri("ark:/72163/080X-779b9990a0c3f-6e")
- self.assertEqual(err2.exception.message, "while converting ARK 'ark:/72163/080X-779b9990a0c3f-6e'. Invalid project shortcode '080X'")
-
- with self.assertRaises(BaseError) as err3:
- convert_ark_v0_to_resource_iri("ark:/72163/080c1-779b9990a0c3f-6e")
- self.assertEqual(err3.exception.message, "while converting ARK 'ark:/72163/080c1-779b9990a0c3f-6e'. Invalid project shortcode '080C1'")
-
- with self.assertRaises(BaseError) as err3:
- convert_ark_v0_to_resource_iri("ark:/72163/080c-779b99+90a0c3f-6e")
- self.assertEqual(err3.exception.message, "while converting ARK 'ark:/72163/080c-779b99+90a0c3f-6e'. Invalid Salsah ID '779b99+90a0c3f'")
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/test/unittests/test_xmlupload.py b/test/unittests/test_xmlupload.py
new file mode 100644
index 000000000..2bf469006
--- /dev/null
+++ b/test/unittests/test_xmlupload.py
@@ -0,0 +1,118 @@
+"""Unit tests for xmlupload"""
+
+import unittest
+from lxml import etree
+
+from knora.dsplib.models.helpers import BaseError
+from knora.dsplib.utils.xml_upload import _convert_ark_v0_to_resource_iri, _remove_circular_references
+from knora.dsplib.models.xmlresource import XMLResource
+
+
+class TestXMLUpload(unittest.TestCase):
+
+ def test_convert_ark_v0_to_resource_iri(self) -> None:
+ ark = "ark:/72163/080c-779b9990a0c3f-6e"
+ iri = _convert_ark_v0_to_resource_iri(ark)
+ self.assertEqual("http://rdfh.ch/080C/Ef9heHjPWDS7dMR_gGax2Q", iri)
+
+ with self.assertRaises(BaseError) as err1:
+ _convert_ark_v0_to_resource_iri("ark:/72163/080c-779b999-0a0c3f-6e")
+ self.assertEqual(err1.exception.message, "while converting ARK 'ark:/72163/080c-779b999-0a0c3f-6e'. The ARK seems to be invalid")
+
+ with self.assertRaises(BaseError) as err2:
+ _convert_ark_v0_to_resource_iri("ark:/72163/080X-779b9990a0c3f-6e")
+ self.assertEqual(err2.exception.message, "while converting ARK 'ark:/72163/080X-779b9990a0c3f-6e'. Invalid project shortcode '080X'")
+
+ with self.assertRaises(BaseError) as err3:
+ _convert_ark_v0_to_resource_iri("ark:/72163/080c1-779b9990a0c3f-6e")
+ self.assertEqual(err3.exception.message, "while converting ARK 'ark:/72163/080c1-779b9990a0c3f-6e'. Invalid project shortcode '080C1'")
+
+ with self.assertRaises(BaseError) as err3:
+ _convert_ark_v0_to_resource_iri("ark:/72163/080c-779b99+90a0c3f-6e")
+ self.assertEqual(err3.exception.message, "while converting ARK 'ark:/72163/080c-779b99+90a0c3f-6e'. Invalid Salsah ID '779b99+90a0c3f'")
+
+
+ def test_remove_circular_references(self) -> None:
+ # create a list of XMLResources from the test data file
+ tree = etree.parse('testdata/test-data.xml')
+ for elem in tree.getiterator():
+ if not (isinstance(elem, etree._Comment) or isinstance(elem, etree._ProcessingInstruction)):
+ elem.tag = etree.QName(elem).localname # remove namespace URI in the element's name
+ etree.cleanup_namespaces(tree) # remove unused namespace declarations
+ resources = [XMLResource(x, 'testonto') for x in tree.getroot() if x.tag == "resource"]
+
+ # get the purged resources and the stashes from the function to be tested
+ resources, stashed_xml_texts_original, stashed_resptr_props_original = _remove_circular_references(resources, False)
+
+ # make a list of all hashes from the stashed xml texts
+ stashed_xml_texts_hashes = list()
+ for res, propdict in stashed_xml_texts_original.items():
+ for elem in propdict.values():
+ for hash, xml in elem.items():
+ stashed_xml_texts_hashes.append(hash)
+
+ # make a version of the stashes with the IDs from the XML file instead of the Python objects
+ stashed_xml_texts = {res.id: {prop.name: [str(x) for x in d.values()] for prop, d in _dict.items()}
+ for res, _dict in stashed_xml_texts_original.items()}
+ stashed_resptr_props = {res.id: {prop.name: l for prop, l in _dict.items()}
+ for res, _dict in stashed_resptr_props_original.items()}
+
+ # hardcode the expected values
+ stashed_xml_texts_expected = {
+ 'obj_0001': {
+ 'testonto:hasRichtext': [
+ '\n This isbold andstringtext! It contains links to all '
+ 'resources:\n'
+ ' obj_0000\n'
+ ' obj_0001\n'
+ ' obj_0002\n'
+ ' obj_0003\n'
+ ' obj_0004\n'
+ ' obj_0005\n'
+ ' obj_0006\n'
+ ' obj_0007\n'
+ ' obj_0008\n'
+ ' obj_0009\n'
+ ' obj_0010\n'
+ ' obj_0011\n'
+ ' \n '
+ ]
+ },
+ 'obj_0011': {
+ 'testonto:hasRichtext': [
+ '\n This isbold andstringtext! It contains links to all '
+ 'resources:\n'
+ ' obj_0000\n'
+ ' obj_0001\n'
+ ' obj_0002\n'
+ ' obj_0003\n'
+ ' obj_0004\n'
+ ' obj_0005\n'
+ ' obj_0006\n'
+ ' obj_0007\n'
+ ' obj_0008\n'
+ ' obj_0009\n'
+ ' obj_0010\n'
+ ' obj_0011\n'
+ ' \n '
+ ]
+ }
+ }
+ stashed_resptr_props_expected = {'obj_0000': {'testonto:hasTestThing': ['obj_0001']}}
+
+ # check if the stashes are equal to the expected stashes
+ self.assertDictEqual(stashed_resptr_props, stashed_resptr_props_expected)
+ self.assertDictEqual(stashed_xml_texts, stashed_xml_texts_expected)
+
+ # check if the stashed hashes can also be found at the correct position in the purged resources
+ for res, propdict in stashed_xml_texts_original.items():
+ for prop, hashdict in propdict.items():
+ stashed_hashes = list(hashdict.keys())
+ purged_res = resources[resources.index(res)]
+ purged_prop = purged_res.properties[purged_res.properties.index(prop)]
+ purged_hashes = [str(val.value) for val in purged_prop.values if str(val.value) in stashed_xml_texts_hashes]
+ self.assertListEqual(stashed_hashes, purged_hashes)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/testdata/test-data.xml b/testdata/test-data.xml
index e1c9676a4..5a17faa6e 100644
--- a/testdata/test-data.xml
+++ b/testdata/test-data.xml
@@ -65,6 +65,12 @@
obj_0010
obj_0011
+
+ Another text without salsah-links
+
+
+ Another text without salsah-links
+
https://dasch.swiss
@@ -136,6 +142,12 @@
obj_0010
obj_0011
+
+ Another text without salsah-links
+
+
+ Another text without salsah-links
+
false