diff --git a/knora/dsp_tools.py b/knora/dsp_tools.py
index 18f0a659f..6c9d9274f 100644
--- a/knora/dsp_tools.py
+++ b/knora/dsp_tools.py
@@ -1,38 +1,47 @@
+"""
+The code in this file handles the arguments passed by the user from the command line and calls the requested actions.
+"""
import argparse
-import sys
import os
-import pkg_resources # part of setuptools
+import sys
-sys.path.append(os.path.dirname(os.path.realpath(__file__)))
+import pkg_resources # part of setuptools
-from dsplib.utils.onto_validate import validate_list, validate_ontology
from dsplib.utils.onto_create_lists import create_lists
from dsplib.utils.onto_create_ontology import create_ontology
from dsplib.utils.onto_get import get_ontology
-from dsplib.utils.xml_upload import xml_upload
from dsplib.utils.onto_process_excel import list_excel2json
+from dsplib.utils.onto_validate import validate_list, validate_ontology
+from dsplib.utils.xml_upload import xml_upload
+sys.path.append(os.path.dirname(os.path.realpath(__file__)))
+
+
+def program(args: list) -> None:
+ """
+ The program parses the command line arguments and calls the requested action
-def program(args):
+ Args:
+ args: list of arguments passed by the user from the command line
+
+ Returns:
+ None
+ """
version = pkg_resources.require("dsp-tools")[0].version
- #
# parse the arguments of the command line
- #
parser = argparse.ArgumentParser(
- description=f"dsp-tools (Version {version}) DaSCH Service Platform data modelling tools (© 2021 by DaSCH)."
- )
+ description=f"dsp-tools (Version {version}) DaSCH Service Platform data modelling tools (© 2021 by DaSCH).")
- subparsers = parser.add_subparsers(title="Subcommands",
- description='Valid subcommands are',
- help='sub-command help')
+ subparsers = parser.add_subparsers(title="Subcommands", description='Valid subcommands are', help='sub-command help')
parser_create = subparsers.add_parser('create', help='Create ontologies, lists etc.')
parser_create.set_defaults(action="create")
parser_create.add_argument("-s", "--server", type=str, default="http://0.0.0.0:3333", help="URL of the DSP server")
parser_create.add_argument("-u", "--user", default="root@example.com", help="Username for DSP server")
parser_create.add_argument("-p", "--password", default="test", help="The password for login")
- parser_create.add_argument("-V", "--validate", action='store_true', help="Do only validation of JSON, no upload of the ontology")
+ parser_create.add_argument("-V", "--validate", action='store_true',
+ help="Do only validation of JSON, no upload of the ontology")
parser_create.add_argument("-L", "--listfile", type=str, default="lists.json", help="Name of list node informationfile")
parser_create.add_argument("-l", "--lists", action='store_true', help="Only create the lists")
parser_create.add_argument("-v", "--verbose", action="store_true", help="Verbose feedback")
@@ -53,22 +62,22 @@ def program(args):
parser_upload.add_argument("-s", "--server", type=str, default="http://0.0.0.0:3333", help="URL of the DSP server")
parser_upload.add_argument("-u", "--user", type=str, default="root@example.com", help="Username for DSP server")
parser_upload.add_argument("-p", "--password", type=str, default="test", help="The password for login")
- parser_upload.add_argument("-V", "--validate", action='store_true', help="Do only validation of JSON, no upload of the ontology")
+ parser_upload.add_argument("-V", "--validate", action='store_true', help="Do only validation of XML, no upload of the data")
parser_upload.add_argument("-i", "--imgdir", type=str, default=".", help="Path to folder containing the images")
parser_upload.add_argument("-S", "--sipi", type=str, default="http://0.0.0.0:1024", help="URL of SIPI server")
parser_upload.add_argument("-v", "--verbose", action="store_true", help="Verbose feedback")
parser_upload.add_argument("xmlfile", help="path to xml file containing the data", default="data.xml")
- parser_excellists = subparsers.add_parser('excel', help='Create lists JSON from excel files')
- parser_excellists.set_defaults(action="excel")
- parser_excellists.add_argument("-S", "--sheet", type=str, help="Name of excel sheet to be used", default="Tabelle1")
- parser_excellists.add_argument("-s", "--shortcode", type=str, help="Shortcode of project", default="4123")
- parser_excellists.add_argument("-l", "--listname", type=str, help="Name of list to be created", default="my_list")
- parser_excellists.add_argument("-L", "--label", type=str, help="Label of list to be created", default="MyList")
- parser_excellists.add_argument("-x", "--lang", type=str, help="Language for label", default="en")
- parser_excellists.add_argument("-v", "--verbose", action="store_true", help="Verbose feedback")
- parser_excellists.add_argument("excelfile", help="Path to the excel file containing the list data", default="lists.xlsx")
- parser_excellists.add_argument("outfile", help="Path to the output JSON file containing the list data", default="list.json")
+ parser_excel_lists = subparsers.add_parser('excel', help='Create lists JSON from excel files')
+ parser_excel_lists.set_defaults(action="excel")
+ parser_excel_lists.add_argument("-S", "--sheet", type=str, help="Name of excel sheet to be used", default="Tabelle1")
+ parser_excel_lists.add_argument("-s", "--shortcode", type=str, help="Shortcode of project", default="4123")
+ parser_excel_lists.add_argument("-l", "--listname", type=str, help="Name of list to be created", default="my_list")
+ parser_excel_lists.add_argument("-L", "--label", type=str, help="Label of list to be created", default="MyList")
+ parser_excel_lists.add_argument("-x", "--lang", type=str, help="Language for label", default="en")
+ parser_excel_lists.add_argument("-v", "--verbose", action="store_true", help="Verbose feedback")
+ parser_excel_lists.add_argument("excelfile", help="Path to the excel file containing the list data", default="lists.xlsx")
+ parser_excel_lists.add_argument("outfile", help="Path to the output JSON file containing the list data", default="list.json")
args = parser.parse_args(args)
@@ -81,56 +90,24 @@ def program(args):
if args.validate:
validate_list(args.datamodelfile)
else:
- create_lists(input_file=args.datamodelfile,
- lists_file=args.listfile,
- server=args.server,
- user=args.user,
- password=args.password,
- verbose=args.verbose,
- dump=args.dump)
+ create_lists(input_file=args.datamodelfile, lists_file=args.listfile, server=args.server, user=args.user,
+ password=args.password, verbose=args.verbose, dump=args.dump)
else:
if args.validate:
validate_ontology(args.datamodelfile)
else:
- create_ontology(input_file=args.datamodelfile,
- lists_file=args.listfile,
- server=args.server,
- user=args.user,
- password=args.password,
- verbose=args.verbose,
- dump=args.dump if args.dump else False)
+ create_ontology(input_file=args.datamodelfile, lists_file=args.listfile, server=args.server, user=args.user,
+ password=args.password, verbose=args.verbose, dump=args.dump if args.dump else False)
elif args.action == "get":
- get_ontology(projident=args.project,
- outfile=args.datamodelfile,
- server=args.server,
- user=args.user,
- password=args.password,
- verbose=args.verbose)
+ get_ontology(projident=args.project, outfile=args.datamodelfile, server=args.server, user=args.user,
+ password=args.password, verbose=args.verbose)
elif args.action == "xmlupload":
- xml_upload(input_file=args.xmlfile,
- server=args.server,
- user=args.user,
- password=args.password,
- imgdir=args.imgdir,
- sipi=args.sipi,
- verbose=args.verbose,
- validate=args.validate)
+ xml_upload(input_file=args.xmlfile, server=args.server, user=args.user, password=args.password, imgdir=args.imgdir,
+ sipi=args.sipi, verbose=args.verbose, validate_only=args.validate)
elif args.action == "excel":
- list_excel2json(excelpath=args.excelfile,
- sheetname=args.sheet,
- shortcode=args.shortcode,
- listname=args.listname,
- label=args.label,
- lang=args.lang,
- outfile=args.outfile,
- verbose=args.verbose)
-
-
-
-def main():
- program(sys.argv[1:])
+ list_excel2json(excelpath=args.excelfile, sheetname=args.sheet, shortcode=args.shortcode, listname=args.listname,
+ label=args.label, lang=args.lang, outfile=args.outfile, verbose=args.verbose)
if __name__ == '__main__':
program(sys.argv[1:])
-
diff --git a/knora/dsplib/models/sipi.py b/knora/dsplib/models/sipi.py
index cf6606b10..c8a25b717 100644
--- a/knora/dsplib/models/sipi.py
+++ b/knora/dsplib/models/sipi.py
@@ -1,36 +1,49 @@
+import os
+
import requests
+
from .helpers import BaseError
-import os
-class Sipi:
+def on_api_error(res):
+ """
+ Checks for any API errors
+ Args:
+ res: the response from the API which is checked, usually in JSON format
- def __init__(self, sipiserver: str, token: str):
- self.sipiserver = sipiserver
- self.token = token
+ Returns:
+ Knora Error that is being raised
+ """
- def on_api_error(self, res):
- """
- Method to check for any API errors
- :param res: The input to check, usually JSON format
- :return: Possible KnoraError that is being raised
- """
+ if res.status_code != 200:
+ raise BaseError("SIPI-ERROR: status code=" + str(res.status_code) + "\nMessage:" + res.text)
- if res.status_code != 200:
- raise BaseError("SIPI-ERROR: status code=" + str(res.status_code) + "\nMessage:" + res.text)
+ if 'error' in res:
+ raise BaseError("SIPI-ERROR: API error: " + res.error)
- if 'error' in res:
- raise BaseError("SIPI-ERROR: API error: " + res.error)
+
+class Sipi:
+ """Represents the Sipi instance"""
+
+ def __init__(self, sipi_server: str, token: str):
+ self.sipi_server = sipi_server
+ self.token = token
def upload_bitstream(self, filepath):
- print(f"filepath=${os.path.basename(filepath)} (${filepath})")
- with open(filepath, 'rb') as bitstreamfile:
- files = {
- 'file': (os.path.basename(filepath), bitstreamfile),
- }
- req = requests.post(self.sipiserver + "/upload?token=" + self.token,
- files=files)
- self.on_api_error(req)
+ """
+ Uploads a bitstream to the Sipi server
+
+ Args:
+ filepath: path to the file, could be either absolute or relative
+
+ Returns:
+ API response
+ """
+ with open(filepath, 'rb') as bitstream_file:
+ files = {'file': (os.path.basename(filepath), bitstream_file), }
+ req = requests.post(self.sipi_server + "/upload?token=" + self.token, files=files)
+ on_api_error(req)
+ print(f'Uploaded file {filepath}')
res = req.json()
return res
diff --git a/knora/dsplib/utils/xml_upload.py b/knora/dsplib/utils/xml_upload.py
index 04b176100..51b870be5 100644
--- a/knora/dsplib/utils/xml_upload.py
+++ b/knora/dsplib/utils/xml_upload.py
@@ -1,31 +1,23 @@
+"""
+The code in this file handles the import of XML data into the DSP platform.
+"""
import os
-import re
+from typing import List, Dict, Optional, Union
-from typing import List, Set, Dict, Tuple, Optional, Union
from lxml import etree
-from pprint import pprint
from dsplib.models.connection import Connection
from dsplib.models.group import Group
+from dsplib.models.permission import Permissions
from dsplib.models.project import Project
from dsplib.models.resource import ResourceInstanceFactory
-from dsplib.models.value import BooleanValue, ColorValue, DateValue, DecimalValue, IntValue, IntervalValue, TextValue, \
- UriValue, KnoraStandoffXml, make_value
-from dsplib.models.permission import PermissionValue, Permissions
from dsplib.models.sipi import Sipi
+from dsplib.models.value import KnoraStandoffXml
-StrDict = Dict[str, str]
-
-StrObj = Union[str, StrDict]
-
-VarStrObj = Union[StrObj, List[StrObj]]
-
-richtext_tags = [
- 'p', 'em', 'strong', 'u', 'sub', 'strike', 'a', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'ol', 'ul', 'li', 'tbody',
- 'table', 'tr', 'td', 'br', 'hr', 'pre', 'cite', 'blockquote', 'code'
-]
class XmlError(BaseException):
+ """Represents an error raised in the context of the XML import"""
+ _message: str
def __init__(self, msg: str):
self._message = msg
@@ -35,22 +27,25 @@ def __str__(self):
class ProjectContext:
- _projects: Project
- _groups: Group
- _projectmap: Dict[str, str]
- _invprojectmap: Dict[str, str]
- _groupmap: Dict[str, str]
- _shortcode: Union[str, None]
- _project_name: Union[str, None]
+ """Represents the project context"""
+
+ _projects: list[Project]
+ _project_map: Dict[str, str] # dictionary of (project name:project IRI) pairs
+ _inv_project_map: Dict[str, str] # dictionary of (project IRI:project name) pairs
+ _groups: list[Group]
+ _group_map: Dict[str, str]
+ _shortcode: Optional[str]
+ _project_name: Optional[str]
def __init__(self, con: Connection, shortcode: Optional[str] = None):
self._shortcode = shortcode
self._projects = Project.getAllProjects(con=con)
- self._projectmap: Dict[str, str] = {x.shortname: x.id for x in self._projects}
- invprojectmap: Dict[str, str] = {x.id: x.shortname for x in self._projects}
+ self._project_map: Dict[str, str] = {x.shortname: x.id for x in self._projects}
+ self._inv_project_map: Dict[str, str] = {x.id: x.shortname for x in self._projects}
self._groups = Group.getAllGroups(con=con)
- self._groupmap: Dict[str, str] = {invprojectmap[x.project] + ':' + x.name: x.id for x in self._groups}
+ self._group_map: Dict[str, str] = {self._inv_project_map[x.project] + ':' + x.name: x.id for x in self._groups}
self._project_name = None
+ # get the project name from the shortcode
if self._shortcode:
for p in self._projects:
if p.shortcode == self._shortcode:
@@ -58,36 +53,26 @@ def __init__(self, con: Connection, shortcode: Optional[str] = None):
break
@property
- def groupmap(self):
- return self._groupmap
-
- @property
- def shortcode(self) -> Union[str, None]:
- return self._shortcode
-
- @shortcode.setter
- def shortcode(self, val: str) -> None:
- for p in self._projects:
- if p.shortcode == self._shortcode:
- self._project_name = p.shortname
- break
+ def group_map(self) -> Dict[str, str]:
+ """Dictionary of (project:group name) and (group id) pairs of all groups in project"""
+ return self._group_map
@property
- def project_name(self) -> Union[str, None]:
+ def project_name(self) -> Optional[str]:
+ """Name of the project"""
return self._project_name
class KnoraValue:
+ """Represents a value of a resource in the Knora ontology"""
+
_value: Union[str, KnoraStandoffXml]
- _resrefs: List[str]
+ _resrefs: Optional[List[str]]
_comment: str
_permissions: str
- is_richtext: bool
+ _is_richtext: bool
- def __init__(self,
- node: etree.Element,
- valtype: str,
- listname: Optional[str] = None) -> None:
+ def __init__(self, node: etree.Element, val_type: str, listname: Optional[str] = None) -> None:
self._resrefs = None
self._comment = node.get('comment')
@@ -98,199 +83,236 @@ def __init__(self,
xmlstr = xmlstr.replace('', '')
xmlstr = xmlstr.replace('', '')
self._value = KnoraStandoffXml(xmlstr)
- tmpidlist = self._value.findall()
- if tmpidlist:
+ tmp_id_list = self._value.findall()
+ if tmp_id_list:
refs = set()
- for tmpid in tmpidlist:
- refs.add(tmpid.split(':')[1])
+ for tmp_id in tmp_id_list:
+ refs.add(tmp_id.split(':')[1])
self._resrefs = list(refs)
else:
- if valtype == 'list':
+ if val_type == 'list':
self._value = listname + ':' + "".join(node.itertext())
else:
self._value = "".join(node.itertext())
@property
- def value(self):
+ def value(self) -> Union[str, KnoraStandoffXml]:
+ """The actual value of the value instance"""
return self._value
@property
- def resrefs(self):
+ def resrefs(self) -> Optional[List[str]]:
+ """List of resource references"""
return self._resrefs
@property
- def comment(self):
+ def comment(self) -> str:
+ """Comment about the value"""
return self._comment
@property
- def permissions(self):
+ def permissions(self) -> str:
+ """Reference to the set of permissions for the value"""
return self._permissions
- def print(self) -> None:
- """
- Print value to stdout for debugging...
+ @property
+ def is_richtext(self) -> bool:
+ """true if text value is of type richtext, false otherwise"""
+ return self._is_richtext
- :return: None
- """
- print(' Value: ' + str(self._value))
+ def print(self) -> None:
+ """Prints the value and its attributes."""
+ print(' Value: ' + str(self._value))
if self._comment:
print(' Comment:' + self._comment)
if self._resrefs is not None:
for i in self._resrefs:
- print(' resref: ' + i)
+ print(' res_ref: ' + i)
class KnoraProperty:
+ """Represents a property of a resource in the XML"""
+
_name: str
_valtype: str
_values: List[KnoraValue]
def __init__(self, node: etree.Element, valtype: str, default_ontology: Optional[str] = None):
- tmp = node.attrib['name'].split(':')
- if len(tmp) > 1:
- if tmp[0]:
+ """
+ The constructor for the knora property
+
+ Args:
+ node: the property node, p.ex.
+ valtype: the type of value given by the name of the property node, p.ex. decimal in
+ default_ontology: the name of the ontology
+ """
+ # get the property name which is in format namespace:propertyname, p.ex. rosetta:hasName
+ tmp_prop_name = node.attrib['name'].split(':')
+ if len(tmp_prop_name) > 1:
+ if tmp_prop_name[0]:
self._name = node.attrib['name']
else:
- self._name = default_ontology + ':' + tmp[1]
+ # replace an empty namespace with the default ontology name
+ self._name = default_ontology + ':' + tmp_prop_name[1]
else:
- self._name = 'knora-admin:' + tmp[0]
+ self._name = 'knora-admin:' + tmp_prop_name[0]
listname = node.attrib.get('list') # safe the list name if given (only for lists)
self._valtype = valtype
self._values = []
+ # parse the subnodes of the property nodes which contain the actual values of the property
for subnode in node:
if subnode.tag == valtype: # the subnode must correspond to the expected value type
self._values.append(KnoraValue(subnode, valtype, listname))
else:
- raise XmlError('Unexpected tag: "{}" may contain only tags!'.format(subnode.tag))
+ raise XmlError('Unexpected tag: "{}". Property may contain only value tags!'.format(subnode.tag))
@property
- def name(self):
+ def name(self) -> str:
+ """The name of the property"""
return self._name
@property
- def valtype(self):
+ def valtype(self) -> str:
+ """The value type of the property"""
return self._valtype
@property
- def values(self):
+ def values(self) -> List[KnoraValue]:
+ """List of values of this property"""
return self._values
def print(self) -> None:
+ """Prints the property."""
print(' Property: {} Type: {}'.format(self._name, self._valtype))
for value in self._values:
value.print()
class KnoraResource:
+ """Represents a resource in the Knora ontology"""
+
_id: str
_label: str
_restype: str
_permissions: str
- _bitstream: str
+ _bitstream: Optional[str]
_properties: List[KnoraProperty]
def __init__(self, node: etree.Element, default_ontology: Optional[str] = None) -> None:
"""
Constructor that parses a resource node from the XML DOM
- :param context: Context for DOM node traversal
- :param node: The DOM node to be processed (representing a resource)
+ Args:
+ node: The DOM node to be processed representing a resource (which is a child of the knora element)
+ default_ontology: The default ontology (given in the attribute default-ontology of the knora element)
"""
- self._id = node.attrib['id'] # safe the unique id
+ self._id = node.attrib['id']
self._label = node.attrib['label']
- tmp = node.attrib['restype'].split(':')
- if len(tmp) > 1:
- if tmp[0]:
+ # get the resource type which is in format namespace:resourcetype, p.ex. rosetta:Image
+ tmp_res_type = node.attrib['restype'].split(':')
+ if len(tmp_res_type) > 1:
+ if tmp_res_type[0]:
self._restype = node.attrib['restype']
else:
- self._restype = default_ontology + ':' + tmp[1]
+ # replace an empty namespace with the default ontology name
+ self._restype = default_ontology + ':' + tmp_res_type[1]
else:
- self._restype = 'knora-admin:' + tmp[0]
+ self._restype = 'knora-admin:' + tmp_res_type[0]
self._permissions = node.attrib['permissions']
self._bitstream = None
self._properties = []
for subnode in node:
if subnode.tag == 'bitstream':
- self._bitstream = subnode.text
+ self._bitstream = subnode.text # path to the file
elif subnode.tag is etree.Comment:
- continue
+ continue
else:
- ptype, dummy = subnode.tag.split('-')
- self._properties.append(KnoraProperty(subnode, ptype, default_ontology))
+ # get the property type which is in format type-prop, p.ex.
+ prop_type, _ = subnode.tag.split('-')
+ self._properties.append(KnoraProperty(subnode, prop_type, default_ontology))
@property
def id(self) -> str:
+ """The unique id of the resource"""
return self._id
@property
def label(self) -> str:
+ """The label of the resource"""
return self._label
@property
def restype(self) -> str:
+ """The type of the resource"""
return self._restype
@property
- def bitstream(self) -> str:
- return self._bitstream
+ def permissions(self) -> str:
+ """The reference to the permissions set for this resource"""
+ return self._permissions
@property
- def permissions(self):
- return self._permissions
+ def bitstream(self) -> Optional[str]:
+ """The path to the bitstream object (file) belonging to the resource"""
+ return self._bitstream
- def print(self):
- print(f'Resource: id={self._id} restype: {self._restype} label: {self._label}')
+ def print(self) -> None:
+ """Prints the resource and its attributes."""
+ print(f'Resource: id={self._id}, restype: {self._restype}, label: {self._label}')
if self._bitstream is not None:
- print(' Bitstream: ' + self._bitstream)
- for property in self._properties:
- property.print()
+ print(' Bitstream: ' + self._bitstream)
+ for prop in self._properties:
+ prop.print()
def get_resptrs(self) -> List[str]:
"""
- Return a list of all reesource id's that a referenced by this resource
- :return: List of resources identified by their unique id's
+ Get a list of all resource id's that are referenced by this resource
+
+ Returns:
+ List of resources identified by their unique id's (as given in the XML)
"""
resptrs: List[str] = []
- for property in self._properties:
- if property.valtype == 'resptr':
- for value in property.values:
+ for prop in self._properties:
+ if prop.valtype == 'resptr':
+ for value in prop.values:
resptrs.append(value.value)
- elif property.valtype == 'text':
- for value in property.values:
+ elif prop.valtype == 'text':
+ for value in prop.values:
if value.resrefs is not None:
resptrs.extend(value.resrefs)
return resptrs
- def get_propvals(self,
- resiri_lookup: StrDict,
- permissions_lookup: StrDict) -> Dict[str, VarStrObj]:
+ def get_propvals(self, resiri_lookup: Dict[str, str], permissions_lookup: Dict[str, Permissions]) -> Dict[str, Permissions]:
"""
- A function which retrieves...
+ Get a dictionary of the property names and their values belonging to a resource
- :param resiri_lookup: Is used to solve internal unique_id's of resourcs to real IRI's
- :param permissions_lookup: Is usd to resolve thee permission ID's to permission sets
- :return: A dict of values with the property name as key and a single value. This dict represents
- the JSON structure that Knora.create_resource() expects.
+ Args:
+ resiri_lookup: Is used to solve internal unique id's of resources to real IRI's
+ permissions_lookup: Is used to resolve the permission id's to permission sets
+
+ Returns:
+ A dict of values with the property name as key and a single value. This dict represents the JSON structure
+ that Knora.create_resource() expects.
"""
- propdata = {}
- for property in self._properties:
- vals: List[StrObj] = [] # == List[Union[str,StrDict]
- for value in property.values:
+ prop_data = {}
+ for prop in self._properties:
+ vals: List[Union[str, Dict[str, str]]] = []
+ for value in prop.values:
v: str
- if property.valtype == 'resptr': # we have a resptr, therefore simple lookup or IRI
+ if prop.valtype == 'resptr': # we have a resptr, therefore simple lookup or IRI
iri = resiri_lookup.get(value.value)
if iri is not None:
v = iri
else:
v = value.value # if we do not find the unique_id, we assume it's a valid knora IRI
- elif property.valtype == 'text':
+ elif prop.valtype == 'text':
if isinstance(value.value, KnoraStandoffXml):
- irirefs = value.value.findall() # The IRI's must be embedded as "...IRI:unique_id:IRI..."
- for iriref in irirefs:
- resid = iriref.split(':')[1]
- iri = resiri_lookup.get(resid)
- value.value.replace(iriref, iri)
+ iri_refs = value.value.findall() # The IRI's must be embedded as "...IRI:unique_id:IRI..."
+ for iri_ref in iri_refs:
+ res_id = iri_ref.split(':')[1]
+ iri = resiri_lookup.get(res_id)
+ value.value.replace(iri_ref, iri)
v = value.value
else:
v = value.value
@@ -306,11 +328,13 @@ def get_propvals(self,
if value.permissions is not None:
tmp['permissions'] = permissions_lookup.get(value.permissions)
vals.append(tmp)
- propdata[property.name] = vals if len(vals) > 1 else vals[0] # append a Union[StrObj,List[StrObj]]
- return propdata
+ prop_data[prop.name] = vals if len(vals) > 1 else vals[0]
+ return prop_data
class XmlAllow:
+ """Represents the allow element of the XML"""
+
_group: str
_permission: str
@@ -318,8 +342,12 @@ def __init__(self, node: etree.Element, project_context: ProjectContext) -> None
"""
Constructor which parses the XML DOM allow element
- :param context: Context for DOM node traversal
- :param node: The DOM node to be processed (representing an single right in a permission set)
+ Args:
+ node: The DOM node to be processed (represents a single right in a permission set)
+ project_context: Context for DOM node traversal
+
+ Returns:
+ None
"""
tmp = node.attrib['group'].split(':')
sysgroups = ['UnknownUser', 'KnownUser', 'ProjectMember', 'Creator', 'ProjectAdmin', 'SystemAdmin']
@@ -328,7 +356,7 @@ def __init__(self, node: etree.Element, project_context: ProjectContext) -> None
if tmp[0] == 'knora-admin' and tmp[1] in sysgroups:
self._group = node.attrib['group']
else:
- self._group = project_context.groupmap.get(node.attrib['group'])
+ self._group = project_context.group_map.get(node.attrib['group'])
if self._group is None:
raise XmlError("Group \"{}\" is not known: Cannot find project!".format(node.attrib['group']))
else:
@@ -343,21 +371,23 @@ def __init__(self, node: etree.Element, project_context: ProjectContext) -> None
self._permission = node.text
@property
- def group(self):
+ def group(self) -> str:
+ """The group specified in the allow element"""
return self._group
@property
- def permission(self):
+ def permission(self) -> str:
+ """The reference to a set of permissions"""
return self._permission
- def print(self):
+ def print(self) -> None:
+ """Prints the attributes of the XmlAllow instance"""
print(" group=", self._group, " permission=", self._permission)
class XmlPermission:
- """
- A class representing a permission set
- """
+ """Represents the permission set containing several XmlAllow elements"""
+
_id: str
_allows: List[XmlAllow]
@@ -365,8 +395,9 @@ def __init__(self, node: etree.Element, project_context: ProjectContext) -> None
"""
Constructor which parses a XML DOM permissions element representing an named permission set
- :param context: Context for DOM node traversal
- :param node: The DOM node to be processed (representing an a permission set)
+ Args:
+ node: The DOM node to be processed (representing an a permission set)
+ project_context: Context for DOM node traversal
"""
self._allows = []
self._id = node.attrib['id']
@@ -375,45 +406,52 @@ def __init__(self, node: etree.Element, project_context: ProjectContext) -> None
@property
def id(self) -> str:
+ """The id of the permission set, p.ex. res-default"""
return self._id
@property
def allows(self) -> List[XmlAllow]:
+ """List of XmlAllow elements defining permissions for specific groups"""
return self._allows
def get_permission_instance(self) -> Permissions:
+ """Returns a list of allow elements of this permission instance"""
permissions = Permissions()
for allow in self._allows:
permissions.add(allow.permission, allow.group)
return permissions
def __str__(self):
- allowstrs: List[str] = []
+ allow_str: List[str] = []
for allow in self._allows:
- allowstrs.append("{} {}".format(allow.permission, allow.group))
- return '|'.join(allowstrs)
+ allow_str.append("{} {}".format(allow.permission, allow.group))
+ return '|'.join(allow_str)
def print(self):
+ """Prints the permission set"""
print('Permission: ', self._id)
for a in self._allows:
a.print()
-def do_sortorder(resources: List[KnoraResource]) -> List[KnoraResource]:
+def do_sort_order(resources: List[KnoraResource]) -> List[KnoraResource]:
"""
- Sort the list of resources such that resources that reference other resources are
- added after the referenced resources. It will fail with an error if there are circular
- references.
+ Sorts a list of resources.
+
+ The sorting is such that resources that reference other resources are added after the referenced resources. It
+ will fail with an error if there are circular references.
- :param resources: List of resources before sorting
- :return: Sorted list of resources
+ Args:
+ resources: List of resources before sorting
+
+ Returns:
+ sorted list of resources
"""
- #
- # here we sort the resources according to outgoing resptrs
- #
+
+ # sort the resources according to outgoing resptrs
ok_resources: [KnoraResource] = []
notok_resources: [KnoraResource] = []
- ok_resids : [str] = []
+ ok_res_ids: [str] = []
cnt = 0
notok_len = 9999999
while len(resources) > 0 and cnt < 10000:
@@ -421,17 +459,17 @@ def do_sortorder(resources: List[KnoraResource]) -> List[KnoraResource]:
resptrs = resource.get_resptrs()
if len(resptrs) == 0:
ok_resources.append(resource)
- ok_resids.append(resource.id)
+ ok_res_ids.append(resource.id)
else:
ok = True
for resptr in resptrs:
- if resptr in ok_resids:
+ if resptr in ok_res_ids:
pass
else:
- ok = False;
+ ok = False
if ok:
ok_resources.append(resource)
- ok_resids.append(resource.id)
+ ok_res_ids.append(resource.id)
else:
notok_resources.append(resource)
resources = notok_resources
@@ -449,91 +487,115 @@ def do_sortorder(resources: List[KnoraResource]) -> List[KnoraResource]:
notok_resources = []
cnt += 1
print('{}. Ordering pass Finished!'.format(cnt))
- print('Remaining: {}'.format(len(resources)))
+ # print('Remaining: {}'.format(len(resources)))
return ok_resources
-def xml_upload(input_file: str,
- server: str,
- user: str,
- password: str,
- imgdir: str,
- sipi: str,
- verbose: bool,
- validate: bool) -> bool:
- current_dir = os.path.dirname(os.path.realpath(__file__))
+def validate_xml_against_schema(input_file: str, schema_file: str) -> bool:
+ """
+ Validates an XML file against an XSD schema
+
+ Args:
+ input_file: the XML file to be validated
+ schema_file: the schema against which the XML file should be validated
- xmlschema_doc = etree.parse(os.path.join(current_dir, 'knora-data-schema.xsd'))
- xmlschema = etree.XMLSchema(xmlschema_doc)
+ Returns:
+ True if the XML file is valid, False otherwise
+ """
+ xmlschema = etree.XMLSchema(schema_file)
doc = etree.parse(input_file)
- xmlschema.assertValid(doc)
- del xmlschema
- del doc
- del xmlschema_doc
+ is_valid = False
+
+ if xmlschema.assertValid(doc):
+ is_valid = True
+
+ return is_valid
+
+
+def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: str, sipi: str, verbose: bool,
+ validate_only: bool) -> bool:
+ """
+ This function reads an XML file and imports the data described in it onto the DSP server.
+
+ Args:
+ input_file : the XML with the data to be imported onto the DSP server
+ server : the DSP server where the data should be imported
+ user : the user (e-mail) with which the data should be imported
+ password : the password of the user with which the data should be imported
+ imgdir : the image directory
+ sipi : the sipi instance to be used
+ verbose : verbose option for the command, if used more output is given to the user
+ validate_only : validation option to validate the XML data without the actual import of the data
+
+ Returns:
+ None
+ """
- if validate:
- return
+ # Validate the input XML file
+ current_dir = os.path.dirname(os.path.realpath(__file__))
+ schema_file = etree.parse(os.path.join(current_dir, 'knora-data-schema.xsd'))
- print("The input data file is syntactically correct and passed validation!")
+ if validate_xml_against_schema(input_file, schema_file):
+ print("The input data file is syntactically correct and passed validation!")
+ if validate_only:
+ return True
- #
- # Connect to the DaSCH Service Platform API
- #
+ # Connect to the DaSCH Service Platform API and get the project context
con = Connection(server)
con.login(user, password)
-
proj_context = ProjectContext(con=con)
resources: List[KnoraResource] = []
permissions: Dict[str, XmlPermission] = {}
- shortcode: Union[str, None] = None
- default_ontology = None
- #
- # read the XML file containing the data, including project shortcode
- #
+ # parse the XML file containing the data
tree = etree.parse(input_file)
knora = tree.getroot()
default_ontology = knora.attrib['default-ontology']
shortcode = knora.attrib['shortcode']
+
for child in knora:
+ # get all permissions
if child.tag == "permissions":
permission = XmlPermission(child, proj_context)
permissions[permission.id] = permission
+ # get all resources
elif child.tag == "resource":
resources.append(KnoraResource(child, default_ontology))
- #
- # sort the resources so that resources which do not link to others come first
- #
- resources = do_sortorder(resources)
+ # sort the resources (resources which do not link to others come first)
+ resources = do_sort_order(resources)
sipi = Sipi(sipi, con.get_token())
- factory = ResourceInstanceFactory(con, shortcode)
+ # get the project information and project ontology from the server
+ project = ResourceInstanceFactory(con, shortcode)
+ # create a dictionary to look up permissions
permissions_lookup: Dict[str, Permissions] = {}
for key, perm in permissions.items():
permissions_lookup[key] = perm.get_permission_instance()
- resclassnames = factory.get_resclass_names()
- resclasses: Dict[str, type] = {}
- for resclassname in resclassnames:
- resclasses[resclassname] = factory.get_resclass(resclassname)
- resiri_lookup: StrDict = {}
+ # create a dictionary to look up resource classes
+ res_classes: Dict[str, type] = {}
+ for res_class_name in project.get_resclass_names():
+ res_classes[res_class_name] = project.get_resclass(res_class_name)
+
+ res_iri_lookup: Dict[str, str] = {}
for resource in resources:
- #resource.print()
+ if verbose:
+ resource.print()
if resource.bitstream:
img = sipi.upload_bitstream(os.path.join(imgdir, resource.bitstream))
bitstream = img['uploadedFiles'][0]['internalFilename']
else:
bitstream = None
- instance = resclasses[resource.restype](con=con,
- label=resource.label,
- permissions=permissions_lookup.get(resource.permissions),
- bitstream=bitstream,
- values=resource.get_propvals(resiri_lookup, permissions_lookup)).create()
- resiri_lookup[resource.id] = instance.iri
- print("Created:", instance.iri)
+
+ # create the resource on the server
+ instance = res_classes[resource.restype](con=con, label=resource.label,
+ permissions=permissions_lookup.get(resource.permissions), bitstream=bitstream,
+ values=resource.get_propvals(res_iri_lookup, permissions_lookup)).create()
+ res_iri_lookup[resource.id] = instance.iri
+ print("Created resource: ", instance.label, " (", resource.id, ") with IRI ", instance.iri)
diff --git a/test/test_tools.py b/test/test_tools.py
index ed40e5ef6..0356a3a81 100644
--- a/test/test_tools.py
+++ b/test/test_tools.py
@@ -1,17 +1,15 @@
-import unittest
import json
+import unittest
-from dsplib.utils.onto_get import get_ontology
+from dsplib.utils.onto_create_ontology import create_ontology
from dsplib.utils.onto_get import get_ontology
from dsplib.utils.onto_validate import validate_ontology
-from dsplib.utils.onto_create_ontology import create_ontology
from knora.dsplib.utils.xml_upload import xml_upload
class TestTools(unittest.TestCase):
def test_get(self):
-
with open('testdata/anything.json') as f:
jsonstr = f.read()
refobj = json.loads(jsonstr)
@@ -49,8 +47,7 @@ def test_xmlupload(self):
imgdir="testdata/bitstreams",
sipi="http://0.0.0.0:1024",
verbose=True,
- validate=False)
-
+ validate_only=False)
if __name__ == '__main__':