From 4dad0ced91f0db5ef80e9690d4ce1866475577fa Mon Sep 17 00:00:00 2001 From: irinaschubert Date: Tue, 7 Dec 2021 08:34:23 +0100 Subject: [PATCH] fix(permissions): use permissions in xml upload (DEV-178) (#127) * Create test data * make permissions optional in schema * use optional type for permissions in xml_upload * update .gitignore * precise the use of username for CLI options * create testdata to test permissions * update xml_upload to handle optional permissions * Add bitstream resources to tests * use resource permissions for bitstream objects * remove duplicated code * remove duplicated code * update documentation * Remove unused file knora.py * use images to test permissions * allow permissions attribute for bitstreams * Refactor xml_upload.py * add bitstream to models * update documentation * Rename Knora instances * Update test data * Rename output labels * refactor bitstream * add bitstream to bazel * refactor resource, value and bitstream * cleanup code * update documentation * Allow separate permission for bitstream * add image related permissions to test data * fix failing test * refactor bitstream * Update documentation * update cli documentation * update url to permissions --- .gitignore | 1 + docs/dsp-tools-usage.md | 6 +- docs/dsp-tools-xmlupload.md | 49 +- knora/BUILD.bazel | 17 - knora/dsp_tools.py | 44 +- knora/dsplib/models/BUILD.bazel | 14 + knora/dsplib/models/bitstream.py | 37 + knora/dsplib/models/group.py | 1 - knora/dsplib/models/resource.py | 216 +-- knora/dsplib/models/value.py | 63 +- knora/dsplib/schemas/data.xsd | 21 +- knora/dsplib/utils/xml_upload.py | 158 +- knora/knora.py | 2108 ------------------------ test/e2e/BUILD.bazel | 1 + test/e2e/test_resource.py | 9 +- testdata/test-data.xml | 24 +- testdata/test-id2iri-data.xml | 2 +- testdata/test-id2iri-replaced.xml | 2 +- testdata/test-onto.json | 121 +- testdata/test-permissions.xml | 158 ++ testdata/tmp/_test-id2iri-replaced.xml | 2 +- 21 files changed, 659 insertions(+), 2395 deletions(-) create mode 100644 knora/dsplib/models/bitstream.py delete mode 100755 knora/knora.py create mode 100644 testdata/test-permissions.xml diff --git a/.gitignore b/.gitignore index 3c20dcce5..3ea529372 100644 --- a/.gitignore +++ b/.gitignore @@ -64,6 +64,7 @@ venv.bak/ # created files lists.json out.json +id2iri_* # bazel /bazel-* diff --git a/docs/dsp-tools-usage.md b/docs/dsp-tools-usage.md index 6cdadea3b..cae907fef 100644 --- a/docs/dsp-tools-usage.md +++ b/docs/dsp-tools-usage.md @@ -26,7 +26,7 @@ dsp-tools create [options] data_model_definition.json The following options are available: -- `-s` | `--server` _server_: URL of the DSP server (default: localhost:3333) +- `-s` | `--server` _server_: URL of the DSP server (default: 0.0.0.0:3333) - `-u` | `--user` _username_: username used for authentication with the DSP API (default: root@example.com) - `-p` | `--password` _password_: password used for authentication with the DSP API (default: test) - `-V` | `--validate`: If set, only the validation of the JSON file is performed. @@ -53,7 +53,7 @@ dsp-tools get [options] output_file.json The following options are available: -- `-s` | `--server` _server_: URL of the DSP server (default: localhost:3333) +- `-s` | `--server` _server_: URL of the DSP server (default: 0.0.0.0:3333) - `-u` | `--user` _username_: username used for authentication with the DSP API (default: root@example.com) - `-p` | `--password` _password_: password used for authentication with the DSP API (default: test) - `-P` | `--project` _shortcode_ | _shortname_ | _iri_: shortcode, shortname or (mandatory) @@ -77,7 +77,7 @@ dsp-tools xmlupload [options] xml_data_file.xml The following options are available: -- `-s` | `--server` _server_: URL of the DSP server (default: localhost:3333) +- `-s` | `--server` _server_: URL of the DSP server (default: 0.0.0.0:3333) - `-u` | `--user` _username_: username used for authentication with the DSP API (default: root@example.com) - `-p` | `--password` _password_: password used for authentication with the DSP API (default: test) - `-i` | `--imgdir` _dirpath_: path to the directory where the bitstream objects are stored (default: .) diff --git a/docs/dsp-tools-xmlupload.md b/docs/dsp-tools-xmlupload.md index 4db917a24..22d2dbf19 100644 --- a/docs/dsp-tools-xmlupload.md +++ b/docs/dsp-tools-xmlupload.md @@ -39,7 +39,7 @@ The `` element may look as follows: The `` element can only contain the following sub-elements: -- `` +- `` (optional) - `` ## Describing permissions with <permissions> elements @@ -48,6 +48,9 @@ The DSP server provides access control for each resource and each field of a res thorough explanation of the permission and access system of the DSP platform, see [DSP platform permissions](https://docs.knora.org/02-knora-ontologies/knora-base/#permissions). +It is optional to define permissions in the XML. If not defined, default permissions are applied (only project and +system administrators can view and edit resources). + The following access rights are defined by the DSP platform which apply to either a resource or a field: - `RV` _restricted view permission_: The user gets only a restricted view of the element. E.g. in case of a still image @@ -72,7 +75,7 @@ name has to be prepended before the group name, separated by a colon, e.g. `dsp- A `` element contains the permissions given to the selected groups and is called a _permission set_. It has a mandatory attribute `id` and must contain at least one `` element per group indicating the group's permission. -It is of the following form: +The permission is referenced inside the resource or property tag by its `id`. It is of the following form: ```xml @@ -84,6 +87,18 @@ It is of the following form: ``` +If you don't want a group to have access at all, leave it out. In the following example, only `ProjectAdmin`s will see +the resource or property with permission `special-permission`: + +```xml + + CR + +``` + +Note: The permissions defined in the XML are applied to resources that are created. But only project or system administrators +do have the permission to create resources via the XML upload. + ### The <allow> sub-element The `` element is used to define the permission for a specific group. It is of the following form: @@ -94,23 +109,23 @@ The `` element is used to define the permission for a specific group. It The allowed values are: -- `RV` _restricted view_: The associated media is shown in reduced quality. -- `V` _view_: The user has read access to the data. -- `M` _modify_: The user may modify a value, but may not delete it. The original value will be preserved using the - history mechanism. -- `D` _delete_: The user is able to mark a resource as deleted. -- `CR` _change right_: The user is able to change the right of a resource or value. +- `RV` _restricted view_: Same as `V` but if it is applied to an image, the image is shown blurred. +- `V` _view_: The user can view a resource or a value, but can not modify it. +- `M` _modify_: The user can modify a resource or value, but can not delete it. The original resource or value will be preserved. +- `D` _delete_: The user can mark a resource or value as deleted. The original resource or value will be preserved. +- `CR` _change right_: The user can change the permission of a resource or value. The user is also allowed to + permanently delete (erase) a resource. -The `group` attribute is mandatory. It defines the group which the permission is applied to. The DSP system groups as +The `group` attribute is mandatory. It defines the group which the permission is applied to. DSP system groups as well as project specific groups are supported. A project specific group name has the form `project-shortname:groupname`. The available system groups are: -- UnknownUser -- KnownUser -- ProjectMember -- Creator -- ProjectAdmin -- SystemAdmin +- UnknownUser (not logged in user) +- KnownUser (logged in user) +- ProjectMember (user with project membership) +- Creator (creator of the resource or value) +- ProjectAdmin (user with project admin membership) +- SystemAdmin (system administrator) There are no sub-elements allowed for the `` element. @@ -225,12 +240,12 @@ Note: Attributes: -- none +- `permissions` : ID or a permission set (optional, but if omitted very restricted default permissions apply) Example: ```xml -postcards/images/EURUS015a.jpg +postcards/images/EURUS015a.jpg ``` ### `` diff --git a/knora/BUILD.bazel b/knora/BUILD.bazel index 6754fcadb..e262bab94 100644 --- a/knora/BUILD.bazel +++ b/knora/BUILD.bazel @@ -4,23 +4,6 @@ load("@rules_python//python:defs.bzl", "py_binary", "py_library") # make the dependencies from requirements.txt available load("@knora_py_deps//:requirements.bzl", "requirement") -py_library( - name = "knora", - visibility = ["//visibility:public"], - srcs = ["knora.py"], - deps = [ - requirement("rdflib"), - requirement("lxml"), - requirement("validators"), - requirement("requests"), - requirement("jsonschema"), - requirement("click"), - requirement("rfc3987"), - requirement("pprint"), - ], - imports = ["."], -) - py_binary( name = "create_ontology", visibility = ["//visibility:public"], diff --git a/knora/dsp_tools.py b/knora/dsp_tools.py index 8038c2886..be828e6e2 100644 --- a/knora/dsp_tools.py +++ b/knora/dsp_tools.py @@ -27,6 +27,16 @@ def program(user_args: list[str]) -> None: Returns: None """ + # help texts + username_text = 'Username (e-mail) for DSP server' + password_text = 'The password for login' + url_text = 'URL of the DSP server' + verbose_text = 'Verbose feedback' + + # default values + default_localhost = 'http://0.0.0.0:3333' + default_user = 'root@example.com' + default_pw = 'test' dsp_tools_version = version('dsp-tools') now = datetime.datetime.now() @@ -42,41 +52,41 @@ def program(user_args: list[str]) -> None: help='Upload an ontology and/or list(s) from a JSON file to the DaSCH ' 'Service Platform') parser_create.set_defaults(action='create') - parser_create.add_argument('-s', '--server', type=str, default='http://0.0.0.0:3333', help='URL of the DSP server') - parser_create.add_argument('-u', '--user', default='root@example.com', help='Username for DSP server') - parser_create.add_argument('-p', '--password', default='test', help='The password for login') + parser_create.add_argument('-s', '--server', type=str, default=default_localhost, help=url_text) + parser_create.add_argument('-u', '--user', default=default_user, help=username_text) + parser_create.add_argument('-p', '--password', default=default_pw, help=password_text) parser_create.add_argument('-V', '--validate', action='store_true', help='Do only validation of JSON, no upload of the ' 'ontology') parser_create.add_argument('-L', '--listfile', type=str, default='lists.json', help='Name of list node informationfile') parser_create.add_argument('-l', '--lists', action='store_true', help='Upload only the list(s)') - parser_create.add_argument('-v', '--verbose', action='store_true', help='Verbose feedback') + parser_create.add_argument('-v', '--verbose', action='store_true', help=verbose_text) parser_create.add_argument('-d', '--dump', action='store_true', help='dump test files for DSP-API requests') parser_create.add_argument('datamodelfile', help='path to data model file') parser_get = subparsers.add_parser('get', help='Get the ontology (data model) of a project from the DaSCH Service Platform.') parser_get.set_defaults(action='get') - parser_get.add_argument('-u', '--user', default='root@example.com', help='Username for DSP server') - parser_get.add_argument('-p', '--password', default='test', help='The password for login') - parser_get.add_argument('-s', '--server', type=str, default='http://0.0.0.0:3333', help='URL of the DSP server') + parser_get.add_argument('-u', '--user', default=default_user, help=username_text) + parser_get.add_argument('-p', '--password', default=default_pw, help=password_text) + parser_get.add_argument('-s', '--server', type=str, default=default_localhost, help=url_text) parser_get.add_argument('-P', '--project', type=str, help='Shortcode, shortname or iri of project', required=True) - parser_get.add_argument('-v', '--verbose', action='store_true', help='Verbose feedback') + parser_get.add_argument('-v', '--verbose', action='store_true', help=verbose_text) parser_get.add_argument('datamodelfile', help='Path to the file the ontology should be written to', default='onto.json') parser_upload = subparsers.add_parser('xmlupload', help='Upload data from an XML file to the DaSCH Service Platform.') parser_upload.set_defaults(action='xmlupload') - parser_upload.add_argument('-s', '--server', type=str, default='http://0.0.0.0:3333', help='URL of the DSP server') - parser_upload.add_argument('-u', '--user', type=str, default='root@example.com', help='Username for DSP server') - parser_upload.add_argument('-p', '--password', type=str, default='test', help='The password for login') + parser_upload.add_argument('-s', '--server', type=str, default=default_localhost, help=url_text) + parser_upload.add_argument('-u', '--user', type=str, default=default_user, help=username_text) + parser_upload.add_argument('-p', '--password', type=str, default=default_pw, help=password_text) parser_upload.add_argument('-V', '--validate', action='store_true', help='Do only validation of XML, no upload of the data') parser_upload.add_argument('-i', '--imgdir', type=str, default='.', help='Path to folder containing the images') parser_upload.add_argument('-S', '--sipi', type=str, default='http://0.0.0.0:1024', help='URL of SIPI server') - parser_upload.add_argument('-v', '--verbose', action='store_true', help='Verbose feedback') + parser_upload.add_argument('-v', '--verbose', action='store_true', help=verbose_text) parser_upload.add_argument('-I', '--incremental', action='store_true', help='Incremental XML upload') parser_upload.add_argument('xmlfile', help='path to xml file containing the data', default='data.xml') @@ -116,12 +126,14 @@ def program(user_args: list[str]) -> None: default='properties.json') parser_id2iri = subparsers.add_parser('id2iri', - help='Replace internal IDs in an XML with their corresponding IRIs from a provided JSON file.') + help='Replace internal IDs in an XML with their corresponding IRIs from a provided JSON file.') parser_id2iri.set_defaults(action='id2iri') parser_id2iri.add_argument('xmlfile', help='Path to the XML file containing the data to be replaced') - parser_id2iri.add_argument('jsonfile', help='Path to the JSON file containing the mapping of internal IDs and their respective IRIs') - parser_id2iri.add_argument('--outfile', default=None, help='Path to the XML output file containing the replaced IDs (optional)') - parser_id2iri.add_argument('-v', '--verbose', action='store_true', help='Verbose feedback') + parser_id2iri.add_argument('jsonfile', + help='Path to the JSON file containing the mapping of internal IDs and their respective IRIs') + parser_id2iri.add_argument('--outfile', default=None, + help='Path to the XML output file containing the replaced IDs (optional)') + parser_id2iri.add_argument('-v', '--verbose', action='store_true', help=verbose_text) args = parser.parse_args(user_args) diff --git a/knora/dsplib/models/BUILD.bazel b/knora/dsplib/models/BUILD.bazel index 76a4bf004..0ec07c13f 100644 --- a/knora/dsplib/models/BUILD.bazel +++ b/knora/dsplib/models/BUILD.bazel @@ -4,6 +4,19 @@ load("@rules_python//python:defs.bzl", "py_binary", "py_library") # make the dependencies from requirements.txt available load("@knora_py_deps//:requirements.bzl", "requirement") +py_library( + name = "bitstream", + visibility = ["//visibility:public"], + srcs = ["bitstream.py"], + deps = [ + ":helpers", + ":langstring", + ":permission", + requirement("pystrict"), + ], + imports = ["."], +) + py_library( name = "connection", visibility = ["//visibility:public"], @@ -130,6 +143,7 @@ py_library( visibility = ["//visibility:public"], srcs = ["resource.py"], deps = [ + ":bitstream", ":connection", ":helpers", ":langstring", diff --git a/knora/dsplib/models/bitstream.py b/knora/dsplib/models/bitstream.py new file mode 100644 index 000000000..22b95027f --- /dev/null +++ b/knora/dsplib/models/bitstream.py @@ -0,0 +1,37 @@ +from typing import Dict, Optional, Any + +from pystrict import strict + +from .helpers import Actions +from .permission import Permissions + + +@strict +class Bitstream: + """ + Represents a bitstream object (file) which is attached to a resource + """ + _value: str + _permissions: Optional[Permissions] + + def __init__(self, + value: str, + permissions: Optional[Permissions] = None): + self._value = value + self._permissions = permissions + + @property + def value(self) -> str: + return self._value + + @property + def permissions(self) -> Optional[Permissions]: + return self._permissions + + def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: + tmp = {} + if action == Actions.Create: + tmp["knora-api:fileValueHasFilename"] = self._value + if self._permissions: + tmp["knora-api:hasPermissions"] = self.permissions.toJsonLdObj() + return tmp diff --git a/knora/dsplib/models/group.py b/knora/dsplib/models/group.py index 46b6c8542..0496c2381 100644 --- a/knora/dsplib/models/group.py +++ b/knora/dsplib/models/group.py @@ -272,7 +272,6 @@ def print(self): status=True, selfjoin=False).create() new_group.print() - print("iiiii") new_group.name = "GROUP TEST - modified" new_group = new_group.update() diff --git a/knora/dsplib/models/resource.py b/knora/dsplib/models/resource.py index 59edd67c6..1adbad87a 100644 --- a/knora/dsplib/models/resource.py +++ b/knora/dsplib/models/resource.py @@ -7,6 +7,7 @@ from pystrict import strict +from .bitstream import Bitstream from .connection import Connection from .helpers import OntoInfo, Actions, BaseError, Cardinality, Context from .listnode import ListNode @@ -40,6 +41,9 @@ class Propinfo: @strict class ResourceInstance(Model): + """ + Represents a resource instance + """ baseclasses_with_bitstream: Set[str] = { 'StillImageRepresentation', 'AudioRepresentation', @@ -47,99 +51,108 @@ class ResourceInstance(Model): 'MovingImageRepresentation', 'ArchiveRepresentation', 'DDDRepresentation', - 'TextRepresentation'} + 'TextRepresentation' + } knora_properties: Set[str] = { "knora-api:isPartOf", "knora-api:seqnum", } - _iri: Union[str, None] - _ark: Union[str, None] - _vark: Union[str, None] - _label: Union[str, None] - _permissions: Union[Permissions, None] - _upermission: Union[PermissionValue, None] - _bitstream: Union[str, None] - _values: Union[Dict[Value, List[Value]], None] + _iri: Optional[str] + _ark: Optional[str] + _version_ark: Optional[str] + _label: Optional[str] + _permissions: Optional[Permissions] + _user_permission: Optional[PermissionValue] + _bitstream: Optional[Bitstream] + _values: Optional[Dict[Value, List[Value]]] def __init__(self, con: Connection, iri: Optional[str] = None, ark: Optional[str] = None, - vark: Optional[str] = None, + version_ark: Optional[str] = None, label: Optional[str] = None, permissions: Optional[Permissions] = None, - upermission: Optional[PermissionValue] = None, + user_permission: Optional[PermissionValue] = None, bitstream: Optional[str] = None, values: Optional[Dict[ str, Union[str, List[str], Dict[str, str], List[Dict[str, str]], Value, List[Value]]]] = None): + super().__init__(con) self._iri = iri - self._label = label self._ark = ark - self._vark = vark + self._version_ark = version_ark + self._label = label self._permissions = permissions - self._upermission = upermission + self._user_permission = user_permission if self.baseclass in self.baseclasses_with_bitstream and bitstream is None: - raise BaseError("The baseclass \"{}\" requires a bitstream value!".format(self.baseclass)) - if self.baseclass not in self.baseclasses_with_bitstream and bitstream is not None: - raise BaseError("The baseclass \"{}\" does not allow a bitstream value!".format(self.baseclass)) - if self.baseclass in self.baseclasses_with_bitstream and bitstream is not None: + raise BaseError(f"ERROR Baseclass '{self.baseclass}' requires a bitstream value!") + if self.baseclass not in self.baseclasses_with_bitstream and bitstream: + raise BaseError(f"ERROR Baseclass '{self.baseclass}' does not allow a bitstream value!") + if self.baseclass in self.baseclasses_with_bitstream and bitstream: self._bitstream = bitstream else: self._bitstream = None self._values = {} if values: - self._values = {} - for propname, propinfo in self.properties.items(): - # if propinfo.valtype is LinkValue: - vals = values.get(propname) - if vals is not None: - valcnt: int = 0 - if type(vals) is list: # we do have several values for this properties - self._values[propname] = [] - for val in vals: - if valcnt > 0 and ( - propinfo.cardinality == Cardinality.C_0_1 or propinfo.cardinality == Cardinality.C_1): - raise BaseError(f'Cardinality does not allow multiple values for "{propname}"!') + for property_name, property_info in self.properties.items(): + cardinality = property_info.cardinality + value_type = property_info.valtype + value = values.get(property_name) + if value: + # property has multiple values + if type(value) is list: + self._values[property_name] = [] + for val in value: + # check if cardinality allows multiple values for a property + if cardinality == Cardinality.C_0_1 or cardinality == Cardinality.C_1: + raise BaseError(f"ERROR Ontology does not allow multiple values for '{property_name}'!") + if type(val) is Value: - self._values[propname].append(val) + self._values[property_name].append(val) + elif type(val) is dict: - if propinfo.valtype is ListValue: + if value_type is ListValue: val['lists'] = self.lists - self._values[propname].append(propinfo.valtype(**val)) + self._values[property_name].append(value_type(**val)) + else: - if propinfo.valtype is ListValue: + if value_type is ListValue: val = {'value': val, 'lists': self.list} - self._values[propname].append(propinfo.valtype(val)) - valcnt = valcnt + 1 - else: # we do have only one value for this property - if type(vals) is Value: - self._values[propname] = vals - elif type(vals) is dict: - if propinfo.valtype is ListValue: - vals['lists'] = self.lists - self._values[propname] = propinfo.valtype(**vals) + self._values[property_name].append(value_type(val)) + # property has one value + else: + if type(value) is Value: + self._values[property_name] = value + + elif type(value) is dict: + if value_type is ListValue: + value['lists'] = self.lists + self._values[property_name] = value_type(**value) + else: - if propinfo.valtype is ListValue: - vals = {'value': val, 'lists': self.list} - self._values[propname] = propinfo.valtype(vals) + if value_type is ListValue: + value = {'value': value, 'lists': self.list} + self._values[property_name] = value_type(value) else: - if propinfo.cardinality == Cardinality.C_1 or propinfo.cardinality == Cardinality.C_1_n: - raise BaseError("Cardinality does require at least one value for \"{}\"!".format(propname)) - for propname in values: - if propname not in self.knora_properties and self.properties.get(propname) is None: - raise BaseError(f'Property "{propname}" is not part of data model!') + if cardinality == Cardinality.C_1 or cardinality == Cardinality.C_1_n: + raise BaseError(f"ERROR The ontology does require at least one value for '{property_name}'!") - def value(self, item): + for property_name in values: + if property_name not in self.knora_properties and not self.properties.get(property_name): + raise BaseError(f"ERROR Property '{property_name}' is not part of ontology!") + + def value(self, item) -> Optional[list[Value]]: if self._values.get(item): - val = self._values[item] - if isinstance(val, list): - tmp = [x.value for x in val] - return tmp + value = self._values[item] + + # value has multiple values + if isinstance(value, list): + return [x.value for x in value] else: - return val.value + return value.value else: return None @@ -157,7 +170,7 @@ def ark(self) -> str: @property def vark(self) -> str: - return self._vark + return self._version_ark def clone(self) -> 'ResourceInstance': return deepcopy(self) @@ -169,9 +182,9 @@ def fromJsonLdObj(self, con: Connection, jsonld_obj: Any) -> 'ResourceInstance': context = Context(jsonld_obj.get('@context')) newinstance._label = jsonld_obj.get("rdfs:label") newinstance._ark = Value.get_typed_value("knora-api:arkUrl", jsonld_obj) - newinstance._vark = Value.get_typed_value("knora-api:versionArkUrl", jsonld_obj) + newinstance._version_ark = Value.get_typed_value("knora-api:versionArkUrl", jsonld_obj) newinstance._permissions = Permissions.fromString(jsonld_obj.get("knora-api:hasPermissions")) - newinstance._upermission = PermissionValue[jsonld_obj.get("knora-api:userHasPermission", jsonld_obj)] + newinstance._user_permission = PermissionValue[jsonld_obj.get("knora-api:userHasPermission", jsonld_obj)] creation_date = Value.get_typed_value("knora-api:creationDate", jsonld_obj) user = Value.get_typed_value("knora-api:attachedToUser", jsonld_obj) project = Value.get_typed_value("knora-api:attachedToProject", jsonld_obj) @@ -206,62 +219,65 @@ def toJsonLdObj(self, action: Actions) -> Any: "@id": self.project } tmp['rdfs:label'] = self._label + if self._permissions: tmp["knora-api:hasPermissions"] = self._permissions.toJsonLdObj() + if self._bitstream: + bitstream_attributes = { + "knora-api:fileValueHasFilename": self._bitstream["internal_file_name"] + } + + permissions = self._bitstream.get("permissions") + if permissions: + bitstream_attributes["knora-api:hasPermissions"] = permissions.toJsonLdObj() + if self.baseclass == 'StillImageRepresentation': - tmp["knora-api:hasStillImageFileValue"] = { - "@type": "knora-api:StillImageFileValue", - "knora-api:fileValueHasFilename": self._bitstream - } + bitstream_attributes["@type"] = "knora-api:StillImageFileValue" + tmp["knora-api:hasStillImageFileValue"] = bitstream_attributes elif self.baseclass == 'DocumentRepresentation': - tmp["knora-api:hasDocumentFileValue"] = { - "@type": "knora-api:DocumentFileValue", - "knora-api:fileValueHasFilename": self._bitstream - } + bitstream_attributes["@type"] = "knora-api:DocumentFileValue" + tmp["knora-api:hasDocumentFileValue"] = bitstream_attributes elif self.baseclass == 'TextRepresentation': - tmp["knora-api:hasTextFileValue"] = { - "@type": "knora-api:TextFileValue", - "knora-api:fileValueHasFilename": self._bitstream - } + bitstream_attributes["@type"] = "knora-api:TextFileValue" + tmp["knora-api:hasTextFileValue"] = bitstream_attributes elif self.baseclass == 'AudioRepresentation': - tmp["knora-api:hasAudioFileValue"] = { - "@type": "knora-api:AudioFileValue", - "knora-api:fileValueHasFilename": self._bitstream - } + bitstream_attributes["@type"] = "knora-api:AudioFileValue" + tmp["knora-api:hasAudioFileValue"] = bitstream_attributes elif self.baseclass == 'ArchiveRepresentation': - tmp["knora-api:hasArchiveFileValue"] = { - "@type": "knora-api:ArchiveFileValue", - "knora-api:fileValueHasFilename": self._bitstream - } + bitstream_attributes["@type"] = "knora-api:ArchiveFileValue" + tmp["knora-api:hasArchiveFileValue"] = bitstream_attributes else: - raise BaseError(f'Baseclass "{self.baseclass}" not yet supported!') - for propname, valtype in self._values.items(): - if type(valtype) is list: - if type(valtype[0]) is LinkValue: - propname += 'Value' - tmp[propname] = [] - for vt in valtype: - tmp[propname].append(vt.toJsonLdObj(action)) - pass + raise BaseError(f"Baseclass '{self.baseclass}' not yet supported!") + + for property_name, value in self._values.items(): + # if the property has several values + if type(value) is list: + if type(value[0]) is LinkValue: + property_name += 'Value' + # append all values to that property + tmp[property_name] = [] + for vt in value: + tmp[property_name].append(vt.toJsonLdObj(action)) + # if property is a link + elif type(value) is LinkValue: + property_name += 'Value' + tmp[property_name] = value.toJsonLdObj(action) else: - if type(valtype) is LinkValue: - propname += 'Value' - tmp[propname] = valtype.toJsonLdObj(action) + tmp[property_name] = value.toJsonLdObj(action) + tmp['@context'] = self.context - else: - pass return tmp def create(self): jsonobj = self.toJsonLdObj(Actions.Create) jsondata = json.dumps(jsonobj, indent=4, separators=(',', ': '), cls=KnoraStandoffXmlEncoder) - # print(jsondata) + # print("jsondata", jsondata) result = self._con.post('/v2/resources', jsondata) newinstance = self.clone() newinstance._iri = result['@id'] newinstance._ark = result['knora-api:arkUrl']['@value'] - newinstance._vark = result['knora-api:versionArkUrl']['@value'] + newinstance._version_ark = result['knora-api:versionArkUrl']['@value'] return newinstance def read(self) -> 'ResourceInstance': @@ -275,12 +291,12 @@ def delete(self): pass def print(self): - print('Iri:', self._iri) - print('Ark:', self._ark) - print('Vark:', self._vark) + print('IRI:', self._iri) + print('ARK:', self._ark) + print('Version ARK:', self._version_ark) print('Label:', self._label) print('Permissions:', str(self._permissions)) - print('Userpermission:', str(self._upermission)) + print('User permission:', str(self._user_permission)) for name, val in self._values.items(): if isinstance(val, list): tmp = [str(x) for x in val] diff --git a/knora/dsplib/models/value.py b/knora/dsplib/models/value.py index 153a23453..e0eb05725 100644 --- a/knora/dsplib/models/value.py +++ b/knora/dsplib/models/value.py @@ -35,16 +35,18 @@ def replace(self, fromStr: str, toStr: str) -> None: @strict class Value: - _iri: Union[str, None] - _comment: Union[str, None] - _permissions: Union[Permissions, None] - _upermission: Union[PermissionValue, None] - _ark_url: Union[str, None] - _vark_url: Union[str, None] + """ + Represents a value + """ + _iri: Optional[str] + _comment: Optional[str] + _permissions: Optional[Permissions] + _upermission: Optional[PermissionValue] + _ark_url: Optional[str] + _vark_url: Optional[str] def __init__(self, iri: Optional[str] = None, - groups: Optional[Group] = None, comment: Optional[LangString] = None, permissions: Optional[Permissions] = None, upermission: Optional[PermissionValue] = None, @@ -96,13 +98,10 @@ def comment(self): def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: tmp = {} if action == Actions.Create: - if self._permissions is not None: + if self._permissions: tmp["knora-api:hasPermissions"] = self.permissions.toJsonLdObj() - - if self._comment is not None: + if self._comment: tmp["knora-api:valueHasComment"] = str(self._comment) - else: - pass return tmp @staticmethod @@ -119,7 +118,7 @@ def get_typed_value(key: str, jsonld_obj: Any) -> Union[str, float]: result = str(tmp["@value"]) elif tmp.get("@type") == "xsd:dateTimeStamp": result = str(tmp["@value"]) - elif tmp.get("@id") is not None: + elif tmp.get("@id"): result = tmp["@id"] else: raise BaseError("Invalid data type in JSON-LD: \"{}\"!".format(tmp["@type"])) @@ -194,8 +193,6 @@ def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: } else: tmp['knora-api:valueAsString'] = str(self._value) - else: - pass return tmp def __str__(self) -> str: @@ -243,8 +240,6 @@ def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: if action == Actions.Create: tmp['@type'] = "knora-api:ColorValue" tmp['knora-api:colorValueAsColor'] = self._value - else: - pass return tmp def __str__(self) -> str: @@ -355,21 +350,19 @@ def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: tmp["knora-api:dateValueHasCalendar"] = self._calendar tmp["knora-api:dateValueHasStartEra"] = self._e1 tmp["knora-api:dateValueHasStartYear"] = self._y1 - if self._m1 is not None: + if self._m1: tmp["knora-api:dateValueHasStartMonth"] = self._m1 - if self._d1 is not None: + if self._d1: tmp["knora-api:dateValueHasStartDay"] = self._d1 tmp["knora-api:dateValueHasEndEra"] = self._e2 - if self._y2 is not None: + if self._y2: tmp["knora-api:dateValueHasEndYear"] = self._y2 else: tmp["knora-api:dateValueHasEndYear"] = self._y1 - if self._m2 is not None: + if self._m2: tmp["knora-api:dateValueHasEndMonth"] = self._m2 - if self._d2 is not None: + if self._d2: tmp["knora-api:dateValueHasEndDay"] = self._d2 - else: - pass return tmp def __str__(self): @@ -445,8 +438,6 @@ def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: '@type': 'xsd:decimal', '@value': str(self._value) } - else: - pass return tmp def __str__(self) -> str: @@ -488,8 +479,6 @@ def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: if action == Actions.Create: tmp['@type'] = "knora-api:GeomValue" tmp['knora-api:geometryValueAsGeometry'] = self._value - else: - pass return tmp def __str__(self) -> str: @@ -531,8 +520,6 @@ def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: if action == Actions.Create: tmp['@type'] = "knora-api:GeonameValue" tmp['knora-api:geonameValueAsGeonameCode'] = self._value - else: - pass return tmp def __str__(self) -> str: @@ -581,8 +568,6 @@ def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: if action == Actions.Create: tmp['@type'] = "knora-api:IntValue" tmp['knora-api:intValueAsInt'] = self._value - else: - pass return tmp def __str__(self) -> str: @@ -634,8 +619,6 @@ def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: if action == Actions.Create: tmp['@type'] = "knora-api:BooleanValue" tmp['knora-api:booleanValueAsBoolean'] = self._value - else: - pass return tmp def __str__(self) -> str: @@ -684,8 +667,6 @@ def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: "@type": "xsd:anyURI", "@value": self._value } - else: - pass return tmp def __str__(self) -> str: @@ -739,8 +720,6 @@ def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: "@type": "xsd:dateTimeStamp", "@value": self._value } - else: - pass return tmp def __str__(self) -> str: @@ -810,8 +789,6 @@ def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: "@type": "xsd:decimal", "@value": str(self._iv_end) } - else: - pass return tmp def __str__(self) -> str: @@ -833,7 +810,7 @@ def __init__(self, ark_url: Optional[str] = None, vark_url: Optional[str] = None): - def find_listnode(nodes: List[ListNode], name: str) -> Union[str, None]: + def find_listnode(nodes: List[ListNode], name: str) -> Optional[str]: for node in nodes: if node.name == name: return node.id @@ -894,8 +871,6 @@ def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: tmp['knora-api:listValueAsListNode'] = { '@id': self._value } - else: - pass return tmp def __str__(self) -> str: @@ -961,8 +936,6 @@ def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: tmp['knora-api:linkValueHasTargetIri'] = { '@id': self._value } - else: - pass return tmp def __str__(self) -> str: diff --git a/knora/dsplib/schemas/data.xsd b/knora/dsplib/schemas/data.xsd index 4c1b12b8d..5a40896c3 100644 --- a/knora/dsplib/schemas/data.xsd +++ b/knora/dsplib/schemas/data.xsd @@ -27,7 +27,6 @@ - @@ -58,7 +57,7 @@ - + @@ -161,6 +160,7 @@ + @@ -339,10 +339,12 @@ - + - + + + @@ -367,7 +369,7 @@ - + @@ -381,10 +383,10 @@ - + @@ -403,12 +405,11 @@ - - + diff --git a/knora/dsplib/utils/xml_upload.py b/knora/dsplib/utils/xml_upload.py index 090e66788..50089cef9 100644 --- a/knora/dsplib/utils/xml_upload.py +++ b/knora/dsplib/utils/xml_upload.py @@ -67,8 +67,33 @@ def project_name(self) -> Optional[str]: return self._project_name -class KnoraValue: - """Represents a value of a resource in the Knora ontology""" +class XMLBitstream: + """Represents a bitstream object (file) of a resource in the XML used for data import""" + + _value: str + _permissions: str + + def __init__(self, node: etree.Element) -> None: + self._value = node.text + self._permissions = node.get('permissions') + + @property + def value(self) -> str: + """The file path of the bitstream object""" + return self._value + + @property + def permissions(self) -> str: + """Reference to the set of permissions for the bitstream object""" + return self._permissions + + def print(self) -> None: + """Prints the bitstream object and its attributes.""" + print(' Bitstream file path: ' + str(self._value)) + + +class XMLValue: + """Represents a value of a resource property in the XML used for data import""" _value: Union[str, KnoraStandoffXml] _resrefs: Optional[List[str]] @@ -134,12 +159,12 @@ def print(self) -> None: print(' res_ref: ' + i) -class KnoraProperty: - """Represents a property of a resource in the XML""" +class XMLProperty: + """Represents a property of a resource in the XML used for data import""" _name: str _valtype: str - _values: List[KnoraValue] + _values: List[XMLValue] def __init__(self, node: etree.Element, valtype: str, default_ontology: Optional[str] = None): """ @@ -167,9 +192,9 @@ def __init__(self, node: etree.Element, valtype: str, default_ontology: Optional # parse the subnodes of the property nodes which contain the actual values of the property for subnode in node: if subnode.tag == valtype: # the subnode must correspond to the expected value type - self._values.append(KnoraValue(subnode, valtype, listname)) + self._values.append(XMLValue(subnode, valtype, listname)) else: - raise XmlError('Unexpected tag: "{}". Property may contain only value tags!'.format(subnode.tag)) + raise XmlError(f"ERROR Unexpected tag: '{subnode.tag}'. Property may contain only value tags!") @property def name(self) -> str: @@ -182,7 +207,7 @@ def valtype(self) -> str: return self._valtype @property - def values(self) -> List[KnoraValue]: + def values(self) -> List[XMLValue]: """List of values of this property""" return self._values @@ -193,15 +218,15 @@ def print(self) -> None: value.print() -class KnoraResource: - """Represents a resource in the Knora ontology""" +class XMLResource: + """Represents a resource in the XML used for data import""" _id: str _label: str _restype: str - _permissions: str - _bitstream: Optional[str] - _properties: List[KnoraProperty] + _permissions: Optional[str] + _bitstream: Optional[XMLBitstream] + _properties: List[XMLProperty] def __init__(self, node: etree.Element, default_ontology: Optional[str] = None) -> None: """ @@ -223,18 +248,22 @@ def __init__(self, node: etree.Element, default_ontology: Optional[str] = None) self._restype = default_ontology + ':' + tmp_res_type[1] else: self._restype = 'knora-admin:' + tmp_res_type[0] - self._permissions = node.attrib['permissions'] + permissions_tmp = node.attrib.get("permissions") + if permissions_tmp: + self._permissions = node.attrib['permissions'] + else: + self._permissions = None self._bitstream = None self._properties = [] for subnode in node: - if subnode.tag == 'bitstream': - self._bitstream = subnode.text # path to the file - elif subnode.tag is etree.Comment: + if subnode.tag is etree.Comment: continue + elif subnode.tag == 'bitstream': + self._bitstream = XMLBitstream(subnode) else: # get the property type which is in format type-prop, p.ex. prop_type, _ = subnode.tag.split('-') - self._properties.append(KnoraProperty(subnode, prop_type, default_ontology)) + self._properties.append(XMLProperty(subnode, prop_type, default_ontology)) @property def id(self) -> str: @@ -257,15 +286,15 @@ def permissions(self) -> str: return self._permissions @property - def bitstream(self) -> Optional[str]: - """The path to the bitstream object (file) belonging to the resource""" + def bitstream(self) -> Optional[XMLBitstream]: + """The bitstream object belonging to the resource""" return self._bitstream def print(self) -> None: """Prints the resource and its attributes.""" print(f'Resource: id={self._id}, restype: {self._restype}, label: {self._label}') - if self._bitstream is not None: - print(' Bitstream: ' + self._bitstream) + if self._bitstream: + print(' Bitstream: ' + self._bitstream.value) for prop in self._properties: prop.print() @@ -304,10 +333,9 @@ def get_propvals(self, resiri_lookup: Dict[str, str], permissions_lookup: Dict[s for prop in self._properties: vals: List[Union[str, Dict[str, str]]] = [] for value in prop.values: - v: str if prop.valtype == 'resptr': # we have a resptr, therefore simple lookup or IRI iri = resiri_lookup.get(value.value) - if iri is not None: + if iri: v = iri else: v = value.value # if we do not find the id, we assume it's a valid knora IRI @@ -328,17 +356,36 @@ def get_propvals(self, resiri_lookup: Dict[str, str], permissions_lookup: Dict[s else: # we have comment or permissions tmp = {'value': v} - if value.comment is not None: + if value.comment: tmp['comment'] = value.comment - if value.permissions is not None: + if value.permissions: tmp['permissions'] = permissions_lookup.get(value.permissions) vals.append(tmp) prop_data[prop.name] = vals if len(vals) > 1 else vals[0] return prop_data + def get_bitstream(self, internal_file_name_bitstream: str, permissions_lookup: Dict[str, Permissions]) -> Optional[Dict[str, Union[str, Permissions]]]: + """ + Get the bitstream object belonging to the resource + + Args: + internal_file_name_bitstream: Internal file name of bitstream object as returned from Sipi + permissions_lookup: Is used to resolve the permission id's to permission sets + + Returns: + A dict of the bitstream object + """ + tmp = None + if self._bitstream: + bitstream = self._bitstream + tmp = {'value': bitstream.value, 'internal_file_name': internal_file_name_bitstream} + if bitstream.permissions: + tmp['permissions'] = permissions_lookup.get(bitstream.permissions) + return tmp + class XmlAllow: - """Represents the allow element of the XML""" + """Represents the allow element of the XML used for data import""" _group: str _permission: str @@ -391,7 +438,7 @@ def print(self) -> None: class XmlPermission: - """Represents the permission set containing several XmlAllow elements""" + """Represents the permission set containing several XmlAllow elements in the XML used for data import""" _id: str _allows: List[XmlAllow] @@ -439,7 +486,7 @@ def print(self): a.print() -def do_sort_order(resources: List[KnoraResource], verbose) -> List[KnoraResource]: +def do_sort_order(resources: List[XMLResource], verbose) -> List[XMLResource]: """ Sorts a list of resources. @@ -455,8 +502,8 @@ def do_sort_order(resources: List[KnoraResource], verbose) -> List[KnoraResource """ # sort the resources according to outgoing resptrs - ok_resources: [KnoraResource] = [] - notok_resources: [KnoraResource] = [] + ok_resources: [XMLResource] = [] + notok_resources: [XMLResource] = [] ok_res_ids: [str] = [] cnt = 0 notok_len = 9999999 @@ -557,7 +604,7 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s con.login(user, password) proj_context = ProjectContext(con=con) - resources: List[KnoraResource] = [] + resources: List[XMLResource] = [] permissions: Dict[str, XmlPermission] = {} # parse the XML file containing the data @@ -588,7 +635,7 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s permissions[permission.id] = permission # get all resources elif child.tag == "resource": - resources.append(KnoraResource(child, default_ontology)) + resources.append(XMLResource(child, default_ontology)) # sort the resources (resources which do not link to others come first) but only if not an incremental upload if not incremental: @@ -613,30 +660,39 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s failed_uploads = [] for resource in resources: - bitstream = None - try: - if verbose: - resource.print() - if resource.bitstream: - img = sipi.upload_bitstream(os.path.join(imgdir, resource.bitstream)) - bitstream = img['uploadedFiles'][0]['internalFilename'] - - # create the resource on the server - instance = res_classes[resource.restype](con=con, label=resource.label, - permissions=permissions_lookup.get(resource.permissions), - bitstream=bitstream, - values=resource.get_propvals(res_iri_lookup, - permissions_lookup)).create() - res_iri_lookup[resource.id] = instance.iri - print(f"Created resource '{instance.label}' ({resource.id}) with IRI '{instance.iri}'") + if verbose: + resource.print() + + resource_bitstream = None + if resource.bitstream: + img = sipi.upload_bitstream(os.path.join(imgdir, resource.bitstream.value)) + internal_file_name_bitstream = img['uploadedFiles'][0]['internalFilename'] + resource_bitstream = resource.get_bitstream(internal_file_name_bitstream, permissions_lookup) + permissions_tmp = permissions_lookup.get(resource.permissions) + + try: + # create a resource instance (ResourceInstance) from the given resource in the XML (XMLResource) + instance: ResourceInstance = res_classes[resource.restype](con=con, + label=resource.label, + permissions=permissions_tmp, + bitstream=resource_bitstream, + values=resource.get_propvals(res_iri_lookup, + permissions_lookup)).create() except BaseError as err: + print( + f"ERROR while trying to create resource '{resource.label}' ({resource.id}). The error message was: {err.message}") failed_uploads.append(resource.id) - print(f"ERROR while trying to upload '{resource.label}' ({resource.id}). The error message was: {err.message}") + continue except Exception as exception: + print( + f"ERROR while trying to create resource '{resource.label}' ({resource.id}). The error message was: {exception}") failed_uploads.append(resource.id) - print(f"ERROR while trying to upload '{resource.label}' ({resource.id}). The error message was: {exception}") + continue + + res_iri_lookup[resource.id] = instance.iri + print(f"Created resource '{instance.label}' ({resource.id}) with IRI '{instance.iri}'") # write mapping of internal IDs to IRIs to file with timestamp timestamp_now = datetime.now() diff --git a/knora/knora.py b/knora/knora.py deleted file mode 100755 index 9679100ed..000000000 --- a/knora/knora.py +++ /dev/null @@ -1,2108 +0,0 @@ -import json -import pprint -import re -import urllib -from pprint import pprint -from typing import List, Dict, Optional, Any, Union -from urllib.parse import quote_plus - -import requests -import validators -from lxml import etree -from rdflib import Graph -from rfc3987 import parse - -# TODO: recheck all the documentation of this file -""" - Properties in knora-api: - - - :hasValue - - :hasColor - - :hasComment - - :hasGeometry - - :hasLinkTo - - :isPartOf - - :isRegionOf - - :isAnnotationOf - - :seqnum - - Classes in knora-api: - - :Resource - - :StillImageRepresentation - - :TextRepresentation - - :AudioRepresentation - - :DDDRepresentation - - :DocumentRepresentation - - :MovingImageRepresentation - - :ArchiveRepresentation - - :Annotation -> :hasComment, :isAnnotationOf, :isAnnotationOfValue - - :LinkObj -> :hasComment, :hasLinkTo, :hasLinkToValue - - :LinkValue [reification node] - - :Region -> :hasColor, :isRegionOf, :hasGeometry, :isRegionOfValue, :hasComment - - For lists: - - - :ListNode -> :hasSubListNode, :listNodePosition, :listNodeName, :isRootNode, :hasRootNode, :attachedToProject - - Values in knora-api: - - - :Value - - :TextValue -> :SimpleText, :TextArea - - :ColorValue -> :Colorpicker - - :DateValue -> :Date - - :DecimalValue -> :SimpleText - - :GeomValue -> :Geometry - - :GeonameValue -> :Geonames - - :IntValue -> :SimpleText, :Spinbox, :Slider - - :BooleanValue -> :Checkbox - - :UriValue -> :SimpleText - - :IntervalValue - - :ListValue -> :Pulldown - - GUI elements - - - :Colorpicker -> ncolors=integer - - :Date - - :Geometry - - :Geonames - - :Interval - - :List -> hlist(required)= - - :Pulldown -> hlist(required)= - - :Radio -> hlist(required)= - - :Richtext - - :Searchbox -> numprops=integer - - :SimpleText -> maxlength=integer, size=integer - - :Slider -> max(required)=decimal, min(required)=decimal - - :Spinbox -> max=decimal, min=decimal - - :Textarea -> cols=integer, rows=integer, width=percent, wrap=string(soft|hard) - - :Checkbox - - :Fileupload -""" - - -class KnoraError(Exception): - """Handles errors happening in this file""" - - def __init__(self, message): - self.message = message - - -class KnoraStandoffXml: - """Used to handle XML strings for standoff markup""" - - iriregexp = re.compile(r'IRI:[^:]*:IRI') - - def __init__(self, xmlstr: str): - self.xmlstr = xmlstr - - def getXml(self): - return self.xmlstr - - def findall(self): - return KnoraStandoffXml.iriregexp.findall(self.xmlstr) - - def replace(self, fromStr: str, toStr: str): - self.xmlstr.replace(fromStr, toStr) - - -class KnoraStandoffXmlEncoder(json.JSONEncoder): - """Classes used as wrapper for knora standoff-XML""" - - def default(self, obj): - if isinstance(obj, KnoraStandoffXml): - return obj.getXml() - return json.JSONEncoder.default(self, obj) - - -class Knora: - """ - This is the main class which holds all the methods for communication with the Knora backend. - """ - - def __init__(self, server: str, prefixes: Dict[str, str] = None): - """ - Constructor requiring the server address, the user and password of KNORA - :param server: Address of the server, e.g https://api.dasch.swiss - :param prefixes: Ontology prefixes used - """ - self.server = server - self.prefixes = prefixes - self.token = None - - def login(self, email: str, password: str) -> None: - """ - Method to login into KNORA which creates a session token. - :param email: Email of user, e.g., root@example.com - :param password: Password of the user, e.g. test - """ - credentials = { - "email": email, - "password": password - } - jsondata = json.dumps(credentials) - - req = requests.post( - self.server + '/v2/authentication', - headers={'Content-Type': 'application/json; charset=UTF-8'}, - data=jsondata - ) - self.on_api_error(req) - result = req.json() - self.token = result["token"] - - def get_token(self) -> str: - return self.token - - def logout(self) -> None: - if self.token is not None: - req = requests.delete( - self.server + '/v2/authentication', - headers={'Authorization': 'Bearer ' + self.token} - ) - self.on_api_error(req) - self.token = None - - def __del__(self): - self.logout() - - def on_api_error(self, res) -> None: - """ - Method to check for any API errors - :param res: The input to check, usually JSON format - :return: Possible KnoraError that is being raised - """ - - if (res.status_code != 200): - raise KnoraError("KNORA-ERROR: status code=" + str(res.status_code) + "\nMessage:" + res.text) - - if 'error' in res: - raise KnoraError("KNORA-ERROR: API error: " + res.error) - - # ========================================================================== - # project related methods - # - - def get_existing_projects(self, full: bool = False) -> List[Any]: - """Returns a list of existing projects - - :return: List of existing projects - """ - - req = requests.get(self.server + '/admin/projects', - headers={'Authorization': 'Bearer ' + self.token}) - self.on_api_error(req) - result = req.json() - - if 'projects' not in result: - raise KnoraError("KNORA-ERROR:\n Request got no projects!") - else: - if full: - return result['projects'] - else: - return list(map(lambda a: a['id'], result['projects'])) - - def get_project(self, shortcode: str) -> Dict[str, Any]: - """Returns project data of given project - - :param shortcode: Shortcode of object - :return: JSON containing the project information - """ - - url = self.server + '/admin/projects/shortcode/' + shortcode - req = requests.get(url, headers={'Authorization': 'Bearer ' + self.token}) - self.on_api_error(req) - - result = req.json() - - return result["project"] - - def project_exists(self, proj_iri: str) -> bool: - """Checks if a given project exists - - :return: Boolean - """ - - projects = self.get_existing_projects() - return proj_iri in projects - - def create_project( - self, - shortcode: str, - shortname: str, - longname: str, - descriptions: Optional[Dict[str, str]] = None, - keywords: Optional[List[str]] = None, - logo: Optional[str] = None) -> str: - """ - Create a new project - - :param shortcode: Dedicated shortcode of project - :param shortname: Short name of the project (e.g acronym) - :param longname: Long name of project - :param descriptions: Dict of the form {lang: descr, …} for the description of the project [Default: None] - :param keywords: List of keywords - :param logo: Link to the project logo [default: None] - :return: Project IRI - """ - - descriptions = list(map(lambda p: {"language": p[0], "value": p[1]}, descriptions.items())) - - project = { - "shortname": shortname, - "shortcode": shortcode, - "longname": longname, - "status": True, - "selfjoin": False - } - if descriptions is not None: - project['description'] = descriptions - if keywords is not None: - project['keywords'] = keywords - if logo is not None: - project['logo'] = logo - - jsondata = json.dumps(project) - # print(jsondata) - - req = requests.post(self.server + "/admin/projects", - headers={'Content-Type': 'application/json; charset=UTF-8', - 'Authorization': 'Bearer ' + self.token}, - data=jsondata) - self.on_api_error(req) - - res = req.json() - return res["project"]["id"] - - def update_project( - self, - shortcode: str, - shortname: Optional[str] = None, - longname: Optional[str] = None, - descriptions: Optional[Dict[str, str]] = None, - keywords: Optional[List[str]] = None, - logo: Optional[str] = None) -> str: - """ - Update project information - - :param shortcode: - :param shortname: - :param longname: - :param descriptions: - :param keywords: - :param logo: - :return: - """ - - descriptions = list(map(lambda p: {"language": p[0], "value": p[1]}, descriptions.items())) - - project = { - "longname": longname, - "description": descriptions, - "keywords": keywords, - "logo": logo, - "status": True, - "selfjoin": False - } - - jsondata = json.dumps(project) - url = self.server + '/admin/projects/iri/' + quote_plus("http://rdfh.ch/projects/" + shortcode) - - req = requests.put(url, - headers={'Content-Type': 'application/json; charset=UTF-8', - 'Authorization': 'Bearer ' + self.token}, - data=jsondata) - self.on_api_error(req) - - res = req.json() - return res['project']['id'] - - # ========================================================================== - # Group related methods - # - - def get_groups(self) -> List[Dict[str, Any]]: - """ - Returns the list of existing groups - - :return: List of projects - """ - url = self.server + '/admin/groups' - - req = requests.get(url, headers={'Authorization': 'Bearer ' + self.token}) - - self.on_api_error(req) - res = req.json() - - return res['groups'] - - def get_group_by_iri(self, group_iri: str) -> Dict[str, Any]: - """ - Returns information about the given group - :param group_iri: IRI of the group - :return: Information about the specific group - """ - url = self.server + '/admin/groups/' + quote_plus(group_iri) - - req = requests.get(url, headers={'Authorization': 'Bearer ' + self.token}) - - self.on_api_error(req) - res = req.json() - - return res['group'] - - def get_group_by_pshortname_and_gname(self, - project_shortname: str, - group_name: str) -> Union[str, None]: - """ - Get a group by project shortname and group name - - :param project_shortname: Project shortname - :param group_name: Group name - :return: IRI of the group - """ - groupinfos = self.get_groups() - for groupinfo in groupinfos: - if groupinfo["project"]["shortname"] == project_shortname and groupinfo["name"] == group_name: - return groupinfo["id"] - return None - - def get_group_by_pshortcode_and_gname(self, - project_shortcode: str, - group_name: str) -> Union[str, None]: - """ - Get a group by project shortcode and group name - - :param project_shortname: Project shortcode - :param group_name: Group name - :return: IRI of the group - """ - groupinfos = self.get_groups() - for groupinfo in groupinfos: - if groupinfo["project"]["shortcode"] == project_shortcode and groupinfo["name"] == group_name: - return groupinfo["id"] - return None - - def get_group_by_piri_and_gname(self, - project_iri: str, - group_name: str) -> Union[str, None]: - """ - Get a group by project shortcode and group name - - :param project_shortname: Project shortcode - :param group_name: Group name - :return: IRI of the group - """ - groupinfos = self.get_groups() - for groupinfo in groupinfos: - if groupinfo["project"]["id"] == project_iri and groupinfo["name"] == group_name: - return groupinfo["id"] - return None - - def create_group(self, - project_iri: str, - name: str, - description: Union[str, Dict[str, str]], - status: bool = True, - selfjoin: bool = False) -> str: - """ - Create a new group - - :param name: Name of the group - :param description: Either a string with the descrioption, or a List of Dicts in the form [{"value": "descr", "language": "lang"},…] - :param project_iri: IRI of the project where the group belongs to - :param status: Active (True) or not active (False) [default: True] - :param selfjoin: ?? [default: False] - :return: IRI of the group - """ - - groupinfo = { - "name": name, - "description": description if isinstance(description, str) else list( - map(lambda p: {"@language": p[0], "@value": p[1]}, description.items())), - "project": project_iri, - "status": status, - "selfjoin": selfjoin - } - jsondata = json.dumps(groupinfo) - - url = self.server + '/admin/groups' - - req = requests.post(url, - headers={'Content-Type': 'application/json; charset=UTF-8', - 'Authorization': 'Bearer ' + self.token}, - data=jsondata) - - self.on_api_error(req) - res = req.json() - return res['group']['id'] - - def update_group(self, - group_iri: str, - name: Optional[str] = None, - description: Optional[Union[str, Dict[str, str]]] = None, - selfjoin: Optional[bool] = None) -> Union[str, None]: - """ - Modify the data about a group. Only parameters that have to be changed must be indicated - :param group_iri: IRI of the grouo to be modified - :param name: New name of the group [optional] - :param description: Either a string with the descrioption, or a List of Dicts in the form [{"value": "descr", "language": "lang"},…] [optional] - :param selfjoin: True or False [optional] - :return: ??? - """ - - groupinfo = {} - done = False - if name is not None: - groupinfo['name'] = name - done = True - if description is not None: - groupinfo['description'] = description if isinstance(description, str) else list( - map(lambda p: {"@language": p[0], "@value": p[1]}, description.items())) - done = True - if selfjoin is not None: - groupinfo['selfjoin'] = selfjoin - done = True - if done: - jsondata = json.dumps(groupinfo) - - url = self.server + '/admin/groups/' + quote_plus(group_iri) - - req = requests.put(url, - headers={'Content-Type': 'application/json; charset=UTF-8', - 'Authorization': 'Bearer ' + self.token}, - data=jsondata) - self.on_api_error(req) - res = req.json() - pprint(res) - return res['group']['id'] - else: - return None - - def change_group_status(self, - group_iri: str, - status: bool) -> None: - """ - Change the status of th group - - :param group_iri: IRI of the group - :param status: Status (active: True, inactive: False) - - :return: None - """ - - statusinfo = { - "status": status - } - jsondata = json.dumps(statusinfo) - - url = self.server + '/admin/groups/' + quote_plus(group_iri) + '/status' - - req = requests.put(url, - headers={'Content-Type': 'application/json; charset=UTF-8', - 'Authorization': 'Bearer ' + self.token}, - data=jsondata) - self.on_api_error(req) - res = req.json() - pprint(res) - - def delete_group(self, - group_iri: str) -> None: - """ - Delete a group - :param group_iri: IRI of the group - :return: - """ - url = self.server + '/admin/groups/' + quote_plus(group_iri) - - req = requests.delete(url, - headers={'Content-Type': 'application/json; charset=UTF-8', - 'Authorization': 'Bearer ' + self.token}) - self.on_api_error(req) - res = req.json() - pprint(res) - - # ========================================================================== - # User related methods - # - - def get_users(self) -> List[Dict[str, Any]]: - """ - Get a list of all users - - :return: Json result. - """ - url = self.server + '/admin/users' - req = requests.get(url, headers={'Authorization': 'Bearer ' + self.token}) - - self.on_api_error(req) - res = req.json() - return res['users'] - - def get_user_by_iri(self, user_iri: str): - """ - Get single user - - :return: - """ - url = self.server + '/admin/users/iri/' + quote_plus(user_iri) - req = requests.get(url, headers={'Authorization': 'Bearer ' + self.token}) - - self.on_api_error(req) - res = req.json() - return res['user'] - - def get_user_by_email(self, email: str): - """ - Get a list of all users - - :return: - """ - url = self.server + '/admin/users/email/' + quote_plus(email) - req = requests.get(url, headers={'Authorization': 'Bearer ' + self.token}) - - self.on_api_error(req) - res = req.json() - return res['user'] - - def create_user(self, - username: str, - email: str, - given_name: str, - family_name: str, - password: str, - lang: str = "en", - sysadmin: bool = False): - """ - Create a new user - - :param username: The username for login purposes (must be unique) - :param email: The email address of the user - :param given_name: The given name (surname, "Vorname", ...) - :param family_name: The family name - :param password: The password for the user - :param lang: language code, either "en", "de", "fr", "it" [default: "en"] - :param sysadmin: True if the user has system admin rights - :return: The user ID as IRI - """ - - userinfo = { - "username": username, - "email": email, - "givenName": given_name, - "familyName": family_name, - "password": password, - "status": True, - "lang": lang, - "systemAdmin": sysadmin - } - - jsondata = json.dumps(userinfo) - url = self.server + '/admin/users' - - req = requests.post(url, - headers={'Content-Type': 'application/json; charset=UTF-8', - 'Authorization': 'Bearer ' + self.token}, - data=jsondata) - - self.on_api_error(req) - res = req.json() - - return res['user']['id'] - - def update_user(self, - user_iri: str, - username: Optional[str] = None, - email: Optional[str] = None, - given_name: Optional[str] = None, - family_name: Optional[str] = None, - password: Optional[str] = None, - lang: Optional[str] = None): - userinfo: Dict[str, Any] = {} - if username is not None: - userinfo["username"] = username - if email is not None: - userinfo["email"] = email - if given_name is not None: - userinfo["givenName"] = given_name - if family_name is not None: - userinfo["familyName"] = family_name - # if password is not None: - # update_user["password"] = password - if lang is not None: - userinfo["lang"] = lang - if len(userinfo) > 0: - url = self.server + '/admin/users/iri/' + quote_plus(user_iri) + '/BasicUserInformation' - jsondata = json.dumps(userinfo) - req = requests.put(url, - headers={'Content-Type': 'application/json; charset=UTF-8', - 'Authorization': 'Bearer ' + self.token}, - data=jsondata) - self.on_api_error(req) - - def change_user_password(self, - user_iri: str, - admin_password: str, - new_password: str): - data = { - "requesterPassword": admin_password, - "newPassword": new_password - } - url = self.server + '/admin/users/iri/' + quote_plus(user_iri) + '/Password' - jsondata = json.dumps(data) - req = requests.put(url, - headers={'Content-Type': 'application/json; charset=UTF-8', - 'Authorization': 'Bearer ' + self.token}, - data=jsondata) - self.on_api_error(req) - - def add_user_to_project(self, - user_iri: str, - project_iri: str): - """ - Add a user to a project - - :param user_iri: IRI of the user - :param project_iri: IRI of the project - :return: None - """ - url = self.server + '/admin/users/iri/' + quote_plus(user_iri) + '/project-memberships/' \ - + quote_plus(project_iri) - req = requests.post(url, headers={'Authorization': 'Bearer ' + self.token}) - self.on_api_error(req) - - return None - - def rm_user_from_project(self, - user_iri: str, - project_iri: str): - """ - Remove a user from a project - - :param user_iri: IRI of the user - :param project_iri: IRI of the project - :return: None - """ - url = self.server + '/admin/users/iri/' + quote_plus(user_iri) + '/project-memberships/' + quote_plus( - project_iri) - req = requests.delete(url, headers={'Authorization': 'Bearer ' + self.token}) - self.on_api_error(req) - - return None - - def add_user_to_project_admin(self, - user_iri: str, - project_iri: str) -> None: - """ - Add a user to the project admin group (knora-admin:ProjectAdmin) - - :param user_iri: IRI of user - :param project_iri: IRI of project - :return: None - """ - url = self.server + '/admin/users/iri/' + quote_plus(user_iri) + '/project-admin-memberships/' + quote_plus( - project_iri) - req = requests.post(url, headers={'Authorization': 'Bearer ' + self.token}) - self.on_api_error(req) - return None - - def rm_user_from_project_admin(self, - user_iri: str, - project_iri: str) -> None: - """ - Remove a user from the project admin group - :param user_iri: IRI of user - :param project_iri: IRI of project - :return: None - """ - url = self.server + '/admin/users/iri/' + quote_plus(user_iri) + '/project-admin-memberships/' + quote_plus( - project_iri) - req = requests.delete(url, headers={'Authorization': 'Bearer ' + self.token}) - self.on_api_error(req) - return None - - def add_user_to_sysadmin(self, user_iri: str) -> None: - """ - Add a user to the project admin group (knora-admin:ProjectAdmin) - - :param user_iri: IRI of user - :param project_iri: IRI of project - :return: None - """ - data = { - "systemAdmin": True - } - url = self.server + '/admin/users/iri/' + quote_plus(user_iri) + '/SystemAdmin' - jsondata = json.dumps(data) - req = requests.put(url, headers={'Content-Type': 'application/json; charset=UTF-8', - 'Authorization': 'Bearer ' + self.token}, - data=jsondata) - self.on_api_error(req) - return None - - def rm_user_from_sysadmin(self, user_iri: str) -> None: - """ - Remove a user from the system admin group - - :param user_iri: IRI of user - :param project_iri: IRI of project - :return: None - """ - data = { - "systemAdmin": False - } - url = self.server + '/admin/users/iri/' + quote_plus(user_iri) + '/SystemAdmin' - jsondata = json.dumps(data) - req = requests.put(url, headers={'Content-Type': 'application/json; charset=UTF-8', - 'Authorization': 'Bearer ' + self.token}, - data=jsondata) - self.on_api_error(req) - return None - - def add_user_to_group(self, - user_iri: str, - group_iri: str) -> None: - url = self.server + '/admin/users/iri/' + quote_plus(user_iri) + '/group-memberships/' + quote_plus(group_iri) - - req = requests.post(url, headers={'Authorization': 'Bearer ' + self.token}) - self.on_api_error(req) - return None - - def rm_user_from_group(self, - user_iri: str, - group_iri: str) -> None: - url = self.server + '/admin/users/iri/' + quote_plus(user_iri) + '/group-memberships/' + quote_plus(group_iri) - - req = requests.delete(url, headers={'Authorization': 'Bearer ' + self.token}) - self.on_api_error(req) - return None - - # ========================================================================== - # Ontology methods - # - - def get_existing_ontologies(self): - """ - - :return: Returns the metadata of all existing ontologies on v2/ontologies - """ - - req = requests.get(self.server + '/v2/ontologies/metadata', - headers={'Authorization': 'Bearer ' + self.token}) - result = req.json() - - if not '@graph' in result: - raise KnoraError("KNORA-ERROR:\n Request got no graph!") - else: - names = list(map(lambda a: a['@id'], result['@graph'])) - return names - - def get_project_ontologies(self, project_code: str) -> Optional[dict]: - """ - - :param project_code: - :return: - """ - - proj = quote_plus("http://rdfh.ch/projects/" + project_code) - req = requests.get(self.server + "/v2/ontologies/metadata/" + proj, - headers={'Authorization': 'Bearer ' + self.token}) - self.on_api_error(req) - result = req.json() - - if '@graph' in result: # multiple ontologies - ontos = list(map(lambda a: { - 'iri': a['@id'], - 'label': a['rdfs:label'], - 'moddate': a.get('knora-api:lastModificationDate') - }, result['@graph'])) - return ontos - elif '@id' in result: # single ontology - return [{ - 'iri': result['@id'], - 'label': result['rdfs:label'], - 'moddate': result.get('knora-api:lastModificationDate') - }] - else: - return None - - def ontology_exists(self, onto_iri: str): - """ - Checks if an ontology exists - - :param onto_iri: The possible ontology iri - :return: boolean - """ - - ontos = self.get_existing_ontologies() - - return onto_iri in ontos - - def get_ontology_lastmoddate(self, onto_iri: str): - """ - Retrieves the lastModificationDate of a Ontology - - :param onto_iri: The ontology to retrieve the lastModificationDate from. - :return: The lastModificationDate if it exists. Else, this method returns a dict with (id, None). If the ontology does not exist, it return None. - """ - - req = requests.get(self.server + '/v2/ontologies/metadata', - headers={'Authorization': 'Bearer ' + self.token}) - result = req.json() - - all_ontos = {} - - for onto in result['@graph']: - if 'knora-api:lastModificationDate' in onto: - all_ontos.__setitem__(onto['@id'], onto['knora-api:lastModificationDate']) - else: - all_ontos.__setitem__(onto['@id'], None) - - return all_ontos[onto_iri] - - def create_ontology(self, - onto_name: str, - project_iri: str, - label: str) -> Dict[str, str]: - """ - Create a new ontology - - :param onto_name: Name of the omntology - :param project_iri: IRI of the project - :param label: A label property for this ontology - :return: Dict with "onto_iri" and "last_onto_date" - """ - - ontology = { - "knora-api:ontologyName": onto_name, - "knora-api:attachedToProject": { - "@id": project_iri - }, - "rdfs:label": label, - "@context": { - "rdfs": 'http://www.w3.org/2000/01/rdf-schema#', - "knora-api": 'http://api.knora.org/ontology/knora-api/v2#' - } - } - - jsondata = json.dumps(ontology) - - req = requests.post(self.server + "/v2/ontologies", - headers={'Content-Type': 'application/json; charset=UTF-8', - 'Authorization': 'Bearer ' + self.token}, - data=jsondata) - - self.on_api_error(req) - - res = req.json() - # TODO: return also ontology name - return {"onto_iri": res['@id'], "last_onto_date": res['knora-api:lastModificationDate']} - - def delete_ontology(self, onto_iri: str, last_onto_date=None): - """ - A method to delete an ontology from /v2/ontologies - - :param onto_iri: The ontology to delete - :param last_onto_date: the lastModificationDate of an ontology. None by default - :return: - """"" # TODO: add return documentation - url = self.server + "/v2/ontologies/" + urllib.parse.quote_plus(onto_iri) - req = requests.delete(url, - params={"lastModificationDate": last_onto_date}, - headers={'Authorization': 'Bearer ' + self.token}) - self.on_api_error(req) - res = req.json() - return res - - def get_ontology_graph(self, - shortcode: str, - name: str): - """ - Returns the turtle definition of the ontology. - - :param shortcode: Shortcode of the project - :param name: Name of the ontology - :return: - """ - url = self.server + "/ontology/" + shortcode + "/" + name + "/v2" - turtle = requests.get(url, - headers={"Accept": "text/turtle", - 'Authorization': 'Bearer ' + self.token}) - self.on_api_error(turtle) - return turtle.text - - def create_res_class(self, - onto_iri: str, - onto_name: str, - last_onto_date: str, - class_name: str, - super_class: List[str], - labels: Dict[str, str], - comments: Optional[Dict[str, str]] = None, - permissions: Optional[Dict[str, str]] = None) -> Dict[str, str]: - """Creates a knora resource class - - :param onto_iri: IRI of the ontology - :param onto_name: Name of the ontology - :param last_onto_date: Last modification date as returned by last call - :param class_name: Name of the class to be created - :param super_class: List of super classes - :param labels: Dict with labels in the form { lang: labeltext } - :param comments: Dict with comments in the form { lang: commenttext } - :param permissions: Dict with permissions in the form - :return: Dict with "class_iri" and "last_onto_date" - """ - - # - # using map and iterable to get the proper format - # - labels = list(map(lambda p: {"@language": p[0], "@value": p[1]}, labels.items())) - - if not comments: - comments = {"en": "none"} - - # - # using map and iterable to get the proper format - # - comments = list(map(lambda p: {"@language": p[0], "@value": p[1]}, comments.items())) - - res_class = { - "@id": onto_iri, - "@type": "owl:Ontology", - "knora-api:lastModificationDate": last_onto_date, - "@graph": [{ - "@id": onto_name + ":" + class_name, - "@type": "owl:Class", - "rdfs:label": labels, - "rdfs:comment": comments, - "rdfs:subClassOf": { - "@id": super_class - } - }], - "@context": { - "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", - "knora-api": "http://api.knora.org/ontology/knora-api/v2#", - "owl": "http://www.w3.org/2002/07/owl#", - "rdfs": "http://www.w3.org/2000/01/rdf-schema#", - "xsd": "http://www.w3.org/2001/XMLSchema#", - onto_name: onto_iri + "#" - } - } - - jsondata = json.dumps(res_class, indent=3, separators=(',', ': ')) - - req = requests.post(self.server + "/v2/ontologies/classes", - headers={'Content-Type': 'application/json; charset=UTF-8', - 'Authorization': 'Bearer ' + self.token}, - data=jsondata) - self.on_api_error(req) - - res = req.json() - return {"class_iri": res['@graph'][0]['@id'], "last_onto_date": res['knora-api:lastModificationDate']} - - def create_property( - self, - onto_iri: str, - onto_name: str, - last_onto_date: str, - prop_name: str, - super_props: List[str], - labels: Dict[str, str], - gui_element: str, - gui_attributes: List[str] = None, - subject: Optional[str] = None, - object: Optional[str] = None, - comments: Optional[Dict[str, str]] = None - ) -> Dict[str, str]: - """Create a Knora property - - :param onto_iri: IRI of the ontology - :param onto_name: Name of the Ontology (prefix) - :param last_onto_date: Last modification date as returned by last call - :param prop_name: Name of the property - :param super_props: List of super-properties - :param labels: Dict with labels in the form { lang: labeltext } - :param gui_element: Valid GUI-Element - :param gui_attributes: Valid GUI-Attributes (or None) - :param subject: Full name (prefix:name) of subject resource class - :param object: Full name (prefix:name) of object resource class - :param comments: Dict with comments in the form { lang: commenttext } - :return: Dict with "prop_iri" and "last_onto_date" keys - """ - # - # using map and iterable to get the proper format - # - labels = list(map(lambda p: {"@language": p[0], "@value": p[1]}, labels.items())) - - if not comments: - comments = {"en": "none"} - - # - # using map and iterable to get the proper format - # - comments = list(map(lambda p: {"@language": p[0], "@value": p[1]}, comments.items())) - - additional_context = {} - for sprop in super_props: - pp = sprop.split(':') - if pp[0] != "knora-api": - additional_context[pp[0]] = self.prefixes[pp[0]] - - # - # using map and iterable to get the proper format - # - super_props = list(map(lambda x: {"@id": x}, super_props)) - if len(super_props) == 1: - super_props = super_props[0] - - propdata = { - "@id": onto_name + ":" + prop_name, - "@type": "owl:ObjectProperty", - "rdfs:label": labels, - "rdfs:comment": comments, - "rdfs:subPropertyOf": super_props, - "salsah-gui:guiElement": { - "@id": gui_element - } - } - if subject: - propdata["knora-api:subjectType"] = { - "@id": subject - } - - if object: - propdata["knora-api:objectType"] = { - "@id": object - } - - if gui_attributes: - propdata["salsah-gui:guiAttribute"] = gui_attributes - - property = { - "@id": onto_iri, - "@type": "owl:Ontology", - "knora-api:lastModificationDate": last_onto_date, - "@graph": [ - propdata - ], - "@context": { - "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", - "knora-api": "http://api.knora.org/ontology/knora-api/v2#", - "salsah-gui": "http://api.knora.org/ontology/salsah-gui/v2#", - "owl": "http://www.w3.org/2002/07/owl#", - "rdfs": "http://www.w3.org/2000/01/rdf-schema#", - "xsd": "http://www.w3.org/2001/XMLSchema#", - onto_name: onto_iri + "#" - } - } - property["@context"].update(additional_context) - jsondata = json.dumps(property, indent=3, separators=(',', ': ')) - req = requests.post(self.server + "/v2/ontologies/properties", - headers={'Content-Type': 'application/json; charset=UTF-8', - 'Authorization': 'Bearer ' + self.token}, - data=jsondata) - self.on_api_error(req) - - res = req.json() - return {"prop_iri": res['@graph'][0]['@id'], "last_onto_date": res['knora-api:lastModificationDate']} - - def create_cardinality( - self, - onto_iri: str, - onto_name: str, - last_onto_date: str, - class_iri: str, - prop_iri: str, - occurrence: str, - gui_order: Optional[int] = None - ) -> Dict[str, str]: - """Add a property with a given cardinality to a class - - :param onto_iri: IRI of the ontology - :param onto_name: Name of the ontology (prefix) - :param last_onto_date: Last modification date as returned by last call - :param class_iri: IRI of the class to which the property will be added - :param prop_iri: IRI of the property that should be added - :param occurrence: Occurrence: "1", "0-1", "0-n" or "1-n" - :param gui_order: Ordering of properties in GUI - :return: Dict with "last_onto_date" key - """ - switcher = { - "1": ("owl:cardinality", 1), - "0-1": ("owl:maxCardinality", 1), - "0-n": ("owl:minCardinality", 0), - "1-n": ("owl:minCardinality", 1) - } - occurrence = switcher.get(occurrence) - if not occurrence: - KnoraError("KNORA-ERROR:\n Invalid occurrence!") - - cardinality = { - "@id": onto_iri, - "@type": "owl:Ontology", - "knora-api:lastModificationDate": last_onto_date, - "@graph": [{ - "@id": class_iri, - "@type": "owl:Class", - "rdfs:subClassOf": { - "@type": "owl:Restriction", - occurrence[0]: occurrence[1], - "owl:onProperty": { - "@id": prop_iri - } - } - }], - "@context": { - "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", - "owl": "http://www.w3.org/2002/07/owl#", - "rdfs": "http://www.w3.org/2000/01/rdf-schema#", - "xsd": "http://www.w3.org/2001/XMLSchema#", - "knora-api": "http://api.knora.org/ontology/knora-api/v2#", - "salsah-gui": "http://api.knora.org/ontology/salsah-gui/v2#", - onto_name: onto_iri + "#" - } - } - if gui_order is not None: - cardinality['@graph'][0]["rdfs:subClassOf"]["salsah-gui:guiOrder"] = int(gui_order) - - jsondata = json.dumps(cardinality, indent=3, separators=(',', ': ')) - - req = requests.post(self.server + "/v2/ontologies/cardinalities", - headers={'Content-Type': 'application/ld+json; charset=UTF-8', - 'Authorization': 'Bearer ' + self.token}, - data=jsondata) - self.on_api_error(req) - - res = req.json() - - return {"last_onto_date": res["knora-api:lastModificationDate"]} - - def create_list_node(self, - project_iri: str, - labels: Dict[str, str], - comments: Optional[Dict[str, str]] = None, - name: Optional[str] = None, - parent_iri: Optional[str] = None) -> str: - """ - Creates a new list node. If there is no parent, a root node is created - - :param project_iri: IRI of the project - :param labels: Dict in the form {lang: label, …} giving the label(s) - :param comments: Dict in the form {lang: comment, …} giving the comment(s) - :param name: Name of the list node - :param parent_iri: None for root node (or omit), otherwise IRI of parent node - :return: IRI of list node - """ - - # - # using map and iterable to get the proper format - # - labels = list(map(lambda p: {"language": p[0], "value": p[1]}, labels.items())) - - listnode = { - "projectIri": project_iri, - "labels": labels, - } - - # - # using map and iterable to get the proper format - # - if comments is not None: - listnode["comments"] = list(map(lambda p: {"language": p[0], "value": p[1]}, comments.items())) - else: - listnode["comments"] = [] - - if name is not None: - listnode["name"] = name - - if parent_iri is not None: - listnode["parentNodeIri"] = parent_iri - url = self.server + "/admin/lists/" + quote_plus(parent_iri) - else: - url = self.server + "/admin/lists" - - jsondata = json.dumps(listnode, indent=3, separators=(',', ': ')) - - req = requests.post(url, - headers={'Content-Type': 'application/json; charset=UTF-8', - 'Authorization': 'Bearer ' + self.token}, - data=jsondata) - self.on_api_error(req) - - res = req.json() - - if parent_iri is not None: - return res['nodeinfo']['id'] - else: - return res['list']['listinfo']['id'] - - def get_lists(self, shortcode: str): - """ - Get the lists belonging to a certain project identified by its shortcode - :param shortcode: Project shortcode - - :return: JSON with the lists - """ - url = self.server + "/admin/lists?projectIri=" + quote_plus("http://rdfh.ch/projects/" + shortcode) - req = requests.get(url, headers={'Authorization': 'Bearer ' + self.token}) - self.on_api_error(req) - return req.json() - - def get_complete_list(self, list_iri: str): - """ - Get all the data (nodes) of a specific list - - :param list_iri: IRI of the list - :return: JSON containing the list info including all nodes - """ - url = self.server + "/admin/lists/" + quote_plus(list_iri) - req = requests.get(url, headers={'Authorization': 'Bearer ' + self.token}) - self.on_api_error(req) - return req.json() - - def get_resource_by_label(self, - label: str, - res_class: Optional[str] = None, - limit_to_project: Optional[str] = None, - offset: Optional[int] = None): - url = self.server + "/v2/searchbylabel/" + label - option = False - if res_class is not None: - url += '?limitToResourceClass=' + quote_plus(res_class) - option = True - if limit_to_project is not None: - if option: - url += '&limitToProject=' + quote_plus(limit_to_project) - else: - url += '?limitToProject=' + quote_plus(limit_to_project) - option = True - if offset is not None: - if option: - url += '&offset=' + quote_plus(limit_to_project) - else: - url += '&offset=' + quote_plus(limit_to_project) - req = requests.get(url, headers={'Authorization': 'Bearer ' + self.token}) - self.on_api_error(req) - return req.json() - - def create_resource(self, - schema: Dict, - res_class: str, - label: str, - values: Dict, - permissions: Optional[str] = None, - stillimage: Optional[str] = None): - """ - This method creates a new resource (instance of a resource class) with the - default permissions. - - :param schema: The schema of the ontology as returned by the method "create_schema()" - :param res_class: The resource class of the resource to be created - :param label: The "rdfs:label" to be given to the new resource - :param values: A dict with the property values. It has the form - { property_name: value, property_name: value,… } or { property_name: [value1, value2,…],… } - The format of the values depends on the value types. E.g. a calendar date has the form - "GREGORIAN:CE:1920-03-12:CE:1921:05:21" where all values except the start year are optional. - :param stillimage: Path to a still image... - :return: A dict in the form { 'iri': resource_iri, 'ark': ark_id, 'vark': dated_ark_id } - """ - - ontoname = schema["ontoname"] - props = schema['resources'][res_class] # this is an array of all properties defined in the ontology - - # we start building the dict that will be transformed into the JSON-LD - jsondata = { - '@type': ontoname + ":" + res_class, - 'rdfs:label': label, - "knora-api:attachedToProject": { - "@id": schema['proj_iri'] - } - } - - if permissions is not None: - jsondata["knora-api:hasPermissions"] = permissions - - if stillimage is not None: - jsondata["knora-api:hasStillImageFileValue"] = { - "@type": "knora-api:StillImageFileValue", - "knora-api:fileValueHasFilename": stillimage - } - - def create_valdict(val): - """ - Internal function to create the JSON-LD for one value - :param val: the value - :return: Dict propared for the JSON-LD for one value - - """ - - if type(val) is dict: - pprint(val) - comment = val.get('comment') - permissions = val.get('permissions') - mapping = val.get('mapping') - val = val.get('value') - else: - comment = None - permissions = None - mapping = None - - valdict = { - '@type': 'knora-api:' + prop["otype"] - } - - if permissions is not None: - valdict["knora-api:hasPermissions"] = permissions - - if comment is not None: - valdict["knora-api:valueHasComment"] = comment - - if prop["otype"] == "TextValue": - if isinstance(val, KnoraStandoffXml): # text with XML markup - valdict['knora-api:textValueAsXml'] = val # no conversion to string - valdict['knora-api:textValueHasMapping'] = { - '@id': 'http://rdfh.ch/standoff/mappings/StandardMapping' if mapping is None else mapping - } - else: # normal text string without markup - valdict['knora-api:valueAsString'] = str(val) - elif prop["otype"] == "ColorValue": - # - # a color value as used in HTML (e.g. "#aaccff" - # - res = re.match('^#(?:[0-9a-fA-F]{3}){1,2}$', str(val)) - if res is None: - raise KnoraError("Invalid ColorValue format! " + str(val)) - valdict['knora-api:colorValueAsColor'] = str(val) - elif prop["otype"] == "DateValue": - # - # A knora date value - # - res = re.match( - '(GREGORIAN:|JULIAN:)?(CE:|BCE:)?(\d{4})?(-\d{1,2})?(-\d{1,2})?(:CE|:BCE)?(:\d{4})?(-\d{1,2})?(-\d{1,2})?', - str(val)) - if res is None: - raise KnoraError("Invalid date format! " + str(val)) - dp = res.groups() - calendar = 'GREGORIAN' if dp[0] is None else dp[0].strip('-: ') - e1 = 'CE' if dp[1] is None else dp[1].strip('-: ') - y1 = None if dp[2] is None else int(dp[2].strip('-: ')) - m1 = None if dp[3] is None else int(dp[3].strip('-: ')) - d1 = None if dp[4] is None else int(dp[4].strip('-: ')) - e2 = 'CE' if dp[5] is None else dp[5].strip('-: ') - y2 = None if dp[6] is None else int(dp[6].strip('-: ')) - m2 = None if dp[7] is None else int(dp[7].strip('-: ')) - d2 = None if dp[8] is None else int(dp[8].strip('-: ')) - if y1 is None: - raise KnoraError("Invalid date format! " + str(val)) - if y2 is not None: - date1 = y1 * 10000 - if m1 is not None: - date1 += m1 * 100 - if d1 is not None: - date1 += d1 - date2 = y2 * 10000 - if m2 is not None: - date2 += m2 * 100 - if d2 is not None: - date2 += d2 - if date1 > date2: - y1, y2 = y2, y1 - m1, m2 = m2, m1 - d1, d2 = d2, d1 - valdict["knora-api:dateValueHasCalendar"] = calendar - valdict["knora-api:dateValueHasStartEra"] = e1 - valdict["knora-api:dateValueHasStartYear"] = int(y1) - if m1 is not None: - valdict["knora-api:dateValueHasStartMonth"] = int(m1) - if d1 is not None: - valdict["knora-api:dateValueHasStartDay"] = int(d1) - valdict["knora-api:dateValueHasEndEra"] = e2 - if y2 is not None: - valdict["knora-api:dateValueHasEndYear"] = int(y2) - else: - valdict["knora-api:dateValueHasEndYear"] = int(y1) - if m2 is not None: - valdict["knora-api:dateValueHasEndMonth"] = int(m2) - if d2 is not None: - valdict["knora-api:dateValueHasEndDay"] = int(d2) - elif prop["otype"] == "DecimalValue": - # - # a decimal value - # - valdict['knora-api:decimalValueAsDecimal'] = { - '@type': 'xsd:decimal', - '@value': str(val) - } - elif prop["otype"] == "GeomValue": - # - # A geometry ID - # - valdict['knora-api:geometryValueAsGeometry'] = str(val) - elif prop["otype"] == "GeonameValue": - # - # A geoname ID - # - valdict['knora-api:geonameValueAsGeonameCode'] = str(val) - elif prop["otype"] == "IntValue": - # - # an integer value - # - valdict['knora-api:intValueAsInt'] = int(val) - elif prop["otype"] == "BooleanValue": - # - # a boolean value - # - if type(val) == bool: - valdict['knora-api:booleanValueAsBoolean'] = val - elif type(val) == str: - if val.upper() == 'TRUE': - valdict['knora-api:booleanValueAsBoolean'] = True - elif val.upper() == 'FALSE': - valdict['knora-api:booleanValueAsBoolean'] = False - else: - raise KnoraError("Invalid boolean format! " + str(val)) - elif type(val) == int: - if val == 0: - valdict['knora-api:booleanValueAsBoolean'] = False - else: - valdict['knora-api:booleanValueAsBoolean'] = True - elif prop["otype"] == "UriValue": - # - # an URI - # - valdict['knora-api:uriValueAsUri'] = { - "@type": "xsd:anyURI", - "@value": str(val) - } - elif prop["otype"] == "TimeValue": - # - # an URI - # - valdict['knora-api:timeValueAsTime'] = { - "@type": "xsd:dateTime", - "@value": str(val) - } - elif prop["otype"] == "IntervalValue": - # - # an interval in the form "1.356:2.456" - # - iv = val.split(':') - valdict["knora-api:intervalValueHasEnd"] = { - "@type": "xsd:decimal", - "@value": str(iv[0]) - } - valdict["knora-api:intervalValueHasStart"] = { - "@type": "xsd:decimal", - "@value": str(iv[1]) - } - elif prop["otype"] == "ListValue": - try: - iriparts = parse(str(val), rule='IRI') - if iriparts['scheme'] == 'http' or iriparts['scheme'] == 'https': - valdict['knora-api:listValueAsListNode'] = { - '@id': str(val) - } - else: - if iriparts['authority'] is not None: - raise KnoraError("Invalid list node: \"" + str(val) + "\" !") - listname = iriparts['scheme'] - nodename = iriparts['path'] - for node in schema['lists'][listname]['nodes']: - found = False - if node['name'] == nodename: - valdict['knora-api:listValueAsListNode'] = { - '@id': node['id'] - } - found = True - break - if not found: - raise KnoraError("Invalid list node: \"" + str(val) + "\" !") - except ValueError as err: - raise KnoraError("Invalid list node: \"" + str(val) + "\" !") - - elif prop["otype"] == "LinkValue": - valdict['@type'] = 'knora-api:LinkValue' - valdict['knora-api:linkValueHasTargetIri'] = { - '@id': str(val) - } - else: - if prop['otype'] in schema['link_otypes']: - valdict['@type'] = 'knora-api:LinkValue' - valdict['knora-api:linkValueHasTargetIri'] = { - '@id': str(val) - } - else: - raise KnoraError("Invalid otype: " + prop['otype']) - - return valdict - - for key, value in values.items(): - prop = None - for tmpprop in props: - if tmpprop['propname'] == key: - prop = tmpprop - if prop is None: - raise KnoraError("Property " + key + " not known!") - - if prop['otype'] == "LinkValue" or prop['otype'] in schema['link_otypes']: - nkey = key + "Value" - else: - nkey = key - if type(value) is list: - valarr = [] - for val in value: - valarr.append(create_valdict(val)) - jsondata[ontoname + ':' + nkey] = valarr - else: - jsondata[ontoname + ':' + nkey] = create_valdict(value) - - jsondata['@context'] = { - "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", - "knora-api": "http://api.knora.org/ontology/knora-api/v2#", - "rdfs": "http://www.w3.org/2000/01/rdf-schema#", - "xsd": "http://www.w3.org/2001/XMLSchema#", - ontoname: schema['onto_iri'] + '#' - } - - jsonstr = json.dumps(jsondata, indent=3, separators=(',', ': '), cls=KnoraStandoffXmlEncoder) - print(jsonstr) - url = self.server + "/v2/resources" - req = requests.post(url, - headers={'Content-Type': 'application/json; charset=UTF-8', - 'Authorization': 'Bearer ' + self.token}, - data=jsonstr) - self.on_api_error(req) - - res = req.json() - - return { - 'iri': res['@id'], - 'ark': res['knora-api:arkUrl']['@value'], - 'vark': res['knora-api:versionArkUrl']['@value'] - } - - def list_creator(self, children: List): - """ - internal Helper function - - :param children: - :return: - """ - if len(children) == 0: - res = list(map(lambda a: {"name": a["name"], "id": a["id"]}, children)) - else: - res = list( - map(lambda a: {"name": a["name"], "id": a["id"], "nodes": self.list_creator(a["children"])}, children)) - return res - - def create_schema(self, shortcode: str, shortname: str): - """ - This method extracts the ontology from the ontology information it gets from Knora. It - gets the ontology information as n3-data using the Knora API and concerts into a convenient - python dict that can be used for further processing. It is required by the bulk import processing - routines. - - :param shortcode: Shortcode of the project - :param shortname: Short name of the ontolopgy - :return: Dict with a simple description of the ontology - """ - turtle = self.get_ontology_graph(shortcode, shortname) - # print(turtle) - g = Graph() - g.parse(format='n3', data=turtle) - - # Get project and ontology IRI's - sparql = """ - SELECT ?onto ?proj - WHERE { - ?onto a owl:Ontology . - ?onto knora-api:attachedToProject ?proj - } - """ - qres = g.query(sparql) - for row in qres: - proj_iri = row.proj.toPython() # project IRI - onto_iri = row.onto.toPython() # ontology IRI - - sparql = """ - SELECT ?res ?prop ?superprop ?otype ?guiele ?attr ?card ?cardval - WHERE { - ?res a owl:Class . - ?res rdfs:subClassOf ?restriction . - ?restriction a owl:Restriction . - ?restriction owl:onProperty ?prop . - ?restriction ?card ?cardval . - ?prop a owl:ObjectProperty . - ?prop knora-api:objectType ?otype . - ?prop salsah-gui:guiElement ?guiele . - ?prop rdfs:subPropertyOf ?superprop . - OPTIONAL { ?prop salsah-gui:guiAttribute ?attr } . - FILTER(?card = owl:cardinality || ?card = owl:maxCardinality || ?card = owl:minCardinality) - } - ORDER BY ?res ?prop - """ - qres = g.query(sparql) - - resources = {} - resclass = '' - propname = '' - link_otypes = [] - propcnt = 0 - propindex = {} # we have to keep the order of the properties as given in the ontology.... - for row in qres: - - nresclass = row.res.toPython() - nresclass = nresclass[nresclass.find('#') + 1:] - if resclass != nresclass: - resclass = nresclass - resources[resclass] = [] - propcnt = 0 - superprop = row.superprop.toPython() - superprop = superprop[superprop.find('#') + 1:] - if superprop == 'hasLinkToValue': # we ignore this one.... - continue - npropname = row.prop.toPython() - npropname = npropname[npropname.find('#') + 1:] - attr = row.attr.toPython() if row.attr is not None else None - if attr is not None: - attr = attr.split('=') - if propname == npropname: - propcnt -= 1 - - # process attribute (there might be multiple attributes) - if attr is not None: - if resources[resclass][propcnt]["attr"] is not None: # TODO: why is this necessary??? - resources[resclass][propcnt]["attr"][attr[0]] = attr[1].strip('<>') - - # process superprop (there might be multiple superprops) - if superprop not in resources[resclass][propcnt]["superprop"]: - resources[resclass][propcnt]["superprop"].append(superprop) - # pprint.pprint(resources[resclass]) - propcnt += 1 - continue - else: - propname = npropname - objtype = row.otype.toPython() - objtype = objtype[objtype.find('#') + 1:] - card = row.card.toPython() - card = card[card.find('#') + 1:] - guiele = row.guiele.toPython() - guiele = guiele[guiele.find('#') + 1:] - resources[resclass].append({ - "propname": propname, - "otype": objtype, - "superprop": [superprop], - "guiele": guiele, - "attr": {attr[0]: attr[1].strip('<>')} if attr is not None else None, - "card": card, - "cardval": row.cardval.toPython() - }) - # pprint.pprint(resources[resclass]) - if superprop == "hasLinkTo": - link_otypes.append(objtype) - propindex[propname] = propcnt - propcnt += 1 - - # Get info about lists attached to the project - listdata = {} - lists = self.get_lists(shortcode) - lists = lists["lists"] - for list in lists: - tmp = self.get_complete_list(list["id"]) - clist = tmp["list"] - listdata[clist["listinfo"]["name"]] = { - "id": clist["listinfo"]["id"], - "nodes": self.list_creator(clist["children"]) - } - schema = { - "proj_iri": proj_iri, - "shortcode": shortcode, - "ontoname": shortname, - "onto_iri": onto_iri, - "lists": listdata, - "resources": resources, - "link_otypes": link_otypes - } - return schema - - def reset_triplestore_content(self): - rdfdata = [ - { - "path": "./knora-ontologies/knora-admin.ttl", - "name": "http://www.knora.org/ontology/knora-admin" - }, - { - "path": "./knora-ontologies/knora-base.ttl", - "name": "http://www.knora.org/ontology/knora-base" - }, - { - "path": "./knora-ontologies/standoff-onto.ttl", - "name": "http://www.knora.org/ontology/standoff" - }, - { - "path": "./knora-ontologies/standoff-data.ttl", - "name": "http://www.knora.org/data/standoff" - }, - { - "path": "./knora-ontologies/salsah-gui.ttl", - "name": "http://www.knora.org/ontology/salsah-gui" - }, - { - "path": "./_test_data/all_data/admin-data.ttl", - "name": "http://www.knora.org/data/admin" - }, - { - "path": "./_test_data/all_data/permissions-data.ttl", - "name": "http://www.knora.org/data/permissions" - }, - { - "path": "./_test_data/all_data/system-data.ttl", - "name": "http://www.knora.org/data/0000/SystemProject" - } - ] - jsondata = json.dumps(rdfdata) - url = self.server + '/admin/store/ResetTriplestoreContent?prependdefaults=false' - - req = requests.post(url, - headers={'Content-Type': 'application/json; charset=UTF-8'}, - data=jsondata) - self.on_api_error(req) - res = req.json() - # pprint(res) - return res - - -class Sipi: - def __init__(self, sipiserver: str, token: str): - self.sipiserver = sipiserver - self.token = token - - def on_api_error(self, res): - """ - Method to check for any API errors - :param res: The input to check, usually JSON format - :return: Possible KnoraError that is being raised - """ - - if (res.status_code != 200): - raise KnoraError("SIPI-ERROR: status code=" + str(res.status_code) + "\nMessage:" + res.text) - - if 'error' in res: - raise KnoraError("SIPI-ERROR: API error: " + res.error) - - def upload_image(self, filepath): - files = { - 'file': (filepath, open(filepath, 'rb')), - } - req = requests.post(self.sipiserver + "/upload?token=" + self.token, - files=files) - self.on_api_error(req) - res = req.json() - return res - - -class BulkImport: - def __init__(self, schema: Dict): - self.schema = schema - self.proj_prefix = 'p' + schema['shortcode'] + '-' + schema["ontoname"] - self.proj_iri = "http://api.knora.org/ontology/" + schema['shortcode'] + "/" + schema[ - "ontoname"] + "/xml-import/v1#" - self.xml_prefixes = { - None: self.proj_iri, - "xsi": "http://www.w3.org/2001/XMLSchema-instance", - self.proj_prefix: self.proj_iri, - "knoraXmlImport": "http://api.knora.org/ontology/knoraXmlImport/v1#" - } - self.root = etree.Element('{http://api.knora.org/ontology/knoraXmlImport/v1#}resources', - nsmap=self.xml_prefixes) - self.project_shortcode = schema["shortcode"] - - def new_xml_element(self, tag: str, options: Dict = None, value: str = None): - tagp = tag.split(':') - if len(tagp) > 1: - fulltag = '{' + self.xml_prefixes.get(tagp[0]) + '}' + tagp[1] - else: - fulltag = tagp[0] - if options is None: - ele = etree.Element(fulltag) - else: - ele = etree.Element(fulltag, options) - if value is not None: - ele.text = value - return ele - - def write_xml(self, filename: str): - # print(etree.tostring(self.root, pretty_print=True, xml_declaration=True, encoding='utf-8')) - f = open(filename, "wb") - f.write(etree.tostring(self.root, pretty_print=True, xml_declaration=True, encoding='utf-8')) - f.close() - - def get_xml_string(self): - """ - This method returns the Bulk-Import XML as an UTF-8 encoded string. - :return: UTF-8 encoded string. - """ - string = etree.tostring(self.root, pretty_print=True, xml_declaration=True, encoding='utf-8') - return string - - def upload(self, user, password, hostname, port): - """ - Upload the Bulk-Import XML to the server. - :param user: the email of the user - :param password: the password of the user - :param hostname: the hostname (e.g., localhost, api.example.org, etc.) - :param port: the port where the API is running (e.g., 3333) - :return: the JSON response - """ - project_iri = "http://rdfh.ch/projects/" + self.project_shortcode - url_encoded_project_iri = urllib.parse.quote_plus(project_iri) - bulkimport_api_url = "http://" + hostname + ":" + port + "/v1/resources/xmlimport/" + url_encoded_project_iri - headers = {"Content-Type": "application/xml"} - r = requests.post(bulkimport_api_url, data=self.get_xml_string(), headers=headers, auth=(user, password)) - return r.json() - - def add_resource(self, resclass: str, id: str, label: str, properties: Dict): - """ - - :param resclass: - :param id: - :param label: - :param properties: - :return: - """ - - def find_list_node_id(nodename: str, nodes: List): - """ - Finds a list node ID from the nodename in a (eventually hierarchical) list of nodes - - :param nodename: Name of the node - :param nodes: List of nodes - :return: the id of the list node (an IRI) - """ - for node in nodes: - if node["name"] == nodename: - return node["id"] - if node.get("nodes") is not None and len(node.get("nodes")) > 0: - node_id = find_list_node_id(nodename, node["nodes"]) - if node_id is not None: - return node_id - return None - - def process_properties(propinfo: Dict, valuestr: any): - """ - Processes a property in order to generate the approptiate XML for V1 bulk import. - - :param pname: property name - :param valuestr: value of the property - :return: Tuple with xml options and processed value: (xmlopt, val) - """ - switcher = { - 'TextValue': {'knoraType': 'richtext_value'}, - 'ColorValue': {'knoraType': 'color_value'}, - 'DateValue': {'knoraType': 'date_value'}, - 'DecimalValue': {'knoraType': 'decimal_value'}, - 'GeomValue': {'knoraType': 'geom_value'}, - 'GeonameValue': {'knoraType': 'geoname_value'}, - 'IntValue': {'knoraType': 'int_value'}, - 'BooleanValue': {'knoraType': 'boolean_value'}, - 'UriValue': {'knoraType': 'uri_value'}, - 'IntervalValue': {'knoraType': 'interval_value'}, - 'ListValue': {'knoraType': 'hlist_value'}, - 'LinkValue': {'knoraType': 'link_value'} - } - for link_otype in self.schema["link_otypes"]: - switcher[link_otype] = {'knoraType': 'link_value'} - xmlopt = switcher.get(propinfo["otype"]) - if xmlopt is None: - raise KnoraError("Did not find " + propinfo["otype"] + " in switcher!") - if xmlopt['knoraType'] == 'link_value': - xmlopt['target'] = str(valuestr) - if validators.url(str(valuestr)): - xmlopt['linkType'] = 'iri' - else: - xmlopt['linkType'] = 'ref' - value = None - elif propinfo["otype"] == 'ListValue': - if validators.url(str(valuestr)): - # it's a full IRI identifying the node - value = valuestr - else: - # it's only a node name. First let's get the node list from the ontology schema - list_id = propinfo["attr"]["hlist"] - for listname in self.schema["lists"]: - if self.schema["lists"][listname]["id"] == list_id: - nodes = self.schema["lists"][listname]["nodes"] - value = find_list_node_id(str(valuestr), nodes) - # if value == 'http://rdfh.ch/lists/0808/X6bb-JerQyu5ULruCGEO0w': - # print("BANG!") - # exit(0) - elif propinfo["otype"] == 'DateValue': - # processing and validating date format - res = re.match('(GREGORIAN:|JULIAN:)?(\d{4})?(-\d{1,2})?(-\d{1,2})?(:\d{4})?(-\d{1,2})?(-\d{1,2})?', - str(valuestr)) - if res is None: - raise KnoraError("Invalid date format! " + str(valuestr)) - dp = res.groups() - calendar = 'GREGORIAN:' if dp[0] is None else dp[0] - y1 = None if dp[1] is None else int(dp[1].strip('-: ')) - m1 = None if dp[2] is None else int(dp[2].strip('-: ')) - d1 = None if dp[3] is None else int(dp[3].strip('-: ')) - y2 = None if dp[4] is None else int(dp[4].strip('-: ')) - m2 = None if dp[5] is None else int(dp[5].strip('-: ')) - d2 = None if dp[6] is None else int(dp[6].strip('-: ')) - if y1 is None: - raise KnoraError("Invalid date format! " + str(valuestr)) - if y2 is not None: - date1 = y1 * 10000 - if m1 is not None: - date1 += m1 * 100 - if d1 is not None: - date1 += d1 - date2 = y2 * 10000 - if m2 is not None: - date2 += m2 * 100 - if d1 is not None: - date2 += d2 - if date1 > date2: - y1, y2 = y2, y1 - m1, m2 = m2, m1 - d1, d2 = d2, d1 - value = calendar + str(y1) - if m1 is not None: - value += f'-{m1:02d}' - if d1 is not None: - value += f'-{d1:02d}' - if y2 is not None: - value += f':{y2:04d}' - if m2 is not None: - value += f'-{m2:02d}' - if d2 is not None: - value += f'-{d2:02d}' - else: - value = str(valuestr) - return xmlopt, value - - if self.schema["resources"].get(resclass) is None: - raise KnoraError('Resource class is not defined in ontology!') - resnode = self.new_xml_element(self.proj_prefix + ':' + resclass, {'id': str(id)}) - - labelnode = self.new_xml_element('knoraXmlImport:label') - labelnode.text = str(label) - resnode.append(labelnode) - - for prop_info in self.schema["resources"][resclass]: - # first we check if the cardinality allows to add this property - if properties.get(prop_info["propname"]) is None: # this property-value is missing - if prop_info["card"] == 'cardinality' \ - and prop_info["cardval"] == 1: - raise KnoraError( - resclass + " requires exactly one " + prop_info["propname"] + "-value: none supplied!") - if prop_info["card"] == 'minCardinality' \ - and prop_info["cardval"] == 1: - raise KnoraError( - resclass + " requires at least one " + prop_info["propname"] + "-value: none supplied!") - continue - if type(properties[prop_info["propname"]]) is list: - if len(properties[prop_info["propname"]]) > 1: - if prop_info["card"] == 'maxCardinality' \ - and prop_info["cardval"] == 1: - raise KnoraError( - resclass + " allows maximal one " + prop_info["propname"] + "-value: several supplied!") - if prop_info["card"] == 'cardinality' \ - and prop_info["cardval"] == 1: - raise KnoraError( - resclass + " requires exactly one " + prop_info["propname"] + "-value: several supplied!") - for p in properties[prop_info["propname"]]: - xmlopt, value = process_properties(prop_info, p) - if xmlopt['knoraType'] == 'link_value': - pnode = self.new_xml_element(self.proj_prefix + ':' + prop_info["propname"]) - pnode.append(self.new_xml_element(self.proj_prefix + ':' + prop_info["otype"], xmlopt, value)) - else: - pnode = self.new_xml_element(self.proj_prefix + ':' + prop_info["propname"], xmlopt, value) - resnode.append(pnode) - else: - xmlopt, value = process_properties(prop_info, properties[prop_info["propname"]]) - if xmlopt['knoraType'] == 'link_value': - pnode = self.new_xml_element(self.proj_prefix + ':' + prop_info["propname"]) - pnode.append(self.new_xml_element(self.proj_prefix + ':' + prop_info["otype"], xmlopt, value)) - else: - pnode = self.new_xml_element(self.proj_prefix + ':' + prop_info["propname"], xmlopt, value) - resnode.append(pnode) - self.root.append(resnode) - - -class IrisLookup: - def __init__(self, local_id_to_iri_json): - self.iris = local_id_to_iri_json - - def get_resource_iri(self, local_id): - """ - Given the result of the bulk-import as json, allow retrieving the resource - IRI based on the local ID. - {'createdResources': [{'clientResourceID': 'LM_1', - 'label': '1', - 'resourceIri': 'http://rdfh.ch/0807/rNxoIK-oR_i0-lO21Y9-CQ'}, - {'clientResourceID': 'LM_2']} - - :param local_id: the local id. resulting JSON from a bulk import upload. - :return: - """ - - try: - resources = self.iris["createdResources"] - iri = "" - for resource in resources: - try: - res_id = resource["clientResourceID"] - if res_id == local_id: - iri = resource["resourceIri"] - else: - pass - except KeyError: - pass - - if iri == "": - return None - else: - return iri - except KeyError: - print("IrisLookup.get_resource_iri - 'createdResources' not found") - - def get_iris_json(self): - return self.iris - - -class ListsLookup: - def __init__(self, lists_json): - self.lists = lists_json - - def get_list_iri(self, listname): - return self.lists[listname]["id"] - - def get_list_node_iri(self, listname, nodename): - if nodename is not None: - nodes = self.lists[listname]["nodes"] - res = "" - for node in nodes: - try: - res = node[nodename]["id"] - except KeyError: - pass - - if res == "": - return None - else: - return res - else: - return None - - def get_lists_json(self): - return self.lists - - -if __name__ == '__main__': - con = Knora('http://localhost:3333') - con.login('root@example.com', 'test') - res = con.get_resource_by_label('Bertschy, Leon', - res_class="http://0.0.0.0:3333/ontology/0807/mls/v2#Lemma") - print('RES-IRI: ', res['@id']) - con.logout() diff --git a/test/e2e/BUILD.bazel b/test/e2e/BUILD.bazel index e8fe3dabd..90fc7fb33 100644 --- a/test/e2e/BUILD.bazel +++ b/test/e2e/BUILD.bazel @@ -94,6 +94,7 @@ py_test( name = "test_resource", srcs = ["test_resource.py"], deps = [ + "//knora/dsplib/models:bitstream", "//knora/dsplib/models:connection", "//knora/dsplib/models:resource", "//knora/dsplib/models:group", diff --git a/test/e2e/test_resource.py b/test/e2e/test_resource.py index bc34bbdc4..8202e1544 100644 --- a/test/e2e/test_resource.py +++ b/test/e2e/test_resource.py @@ -56,10 +56,17 @@ def test_Resource_create(self) -> None: file_ref = img['uploadedFiles'][0]['internalFilename'] res_perm = Permissions({PermissionValue.M: ["knora-admin:UnknownUser", "knora-admin:KnownUser"], PermissionValue.CR: ["knora-admin:Creator", "knora-admin:ProjectAdmin"]}) + + resource_bitstream = { + 'value': 'testdata/bitstreams/TEMP11.TIF', + 'internal_file_name': file_ref, + 'permissions': res_perm + } + thing_picture( con=self.con, label='ThingPicture', - bitstream=file_ref, + bitstream=resource_bitstream, permissions=res_perm, values={ 'anything:hasPictureTitle': make_value(value="A Thing Picture named Lena", permissions=res_perm) diff --git a/testdata/test-data.xml b/testdata/test-data.xml index 76fec6672..9aa64b61c 100644 --- a/testdata/test-data.xml +++ b/testdata/test-data.xml @@ -6,50 +6,45 @@ shortcode="4123" default-ontology="testonto"> - + RV V CR CR - D V - CR> - CR> - M> + CR + CR V V CR - CR> - D> + CR V CR CR - M - + id="obj_0000"> - Dies ist anderes TestThing + Dies ist ein TestThing ohne Angabe von permissions - Dies ist ein einfacher Text ohne Markup - Nochmals einen einfachen Text + Nochmals ein einfacher Text This is @@ -101,6 +96,7 @@ b2 + b2 obj_0000 diff --git a/testdata/test-id2iri-data.xml b/testdata/test-id2iri-data.xml index b06044b36..c1a9d24ac 100644 --- a/testdata/test-id2iri-data.xml +++ b/testdata/test-id2iri-data.xml @@ -6,7 +6,7 @@ shortcode="082E" default-ontology="rosetta"> - + V V diff --git a/testdata/test-id2iri-replaced.xml b/testdata/test-id2iri-replaced.xml index bf791dfdf..666a04caa 100644 --- a/testdata/test-id2iri-replaced.xml +++ b/testdata/test-id2iri-replaced.xml @@ -1,6 +1,6 @@ - + V V diff --git a/testdata/test-onto.json b/testdata/test-onto.json index 36f2e17eb..645513be0 100644 --- a/testdata/test-onto.json +++ b/testdata/test-onto.json @@ -84,10 +84,19 @@ ], "groups": [ { - "name": "testgroup", + "name": "testgroupEditors", "descriptions": { - "en": "Test group", - "de": "Testgruppe" + "en": "Test group editors", + "de": "Testgruppe Editors" + }, + "selfjoin": false, + "status": true + }, + { + "name": "testgroupViewers", + "descriptions": { + "en": "Test group viewers", + "de": "Testgruppe Viewers" }, "selfjoin": false, "status": true @@ -95,16 +104,51 @@ ], "users": [ { - "username": "tester", - "email": "tester@test.org", - "givenName": "Testing", - "familyName": "tester", + "username": "testerKnownUser", + "email": "tester.known@test.org", + "givenName": "Tester", + "familyName": "Known", + "password": "test0815", + "lang": "en" + }, + { + "username": "testerProjectEditor", + "email": "tester.projecteditor@test.org", + "givenName": "Tester", + "familyName": "Editor", "password": "test0815", "lang": "en", "groups": [ - ":testgroup" + ":testgroupEditors" ], - "projects": [] + "projects": [ + ":member" + ] + }, + { + "username": "testerProjectViewer", + "email": "tester.projectviewer@test.org", + "givenName": "Tester", + "familyName": "Viewer", + "password": "test0815", + "lang": "en", + "groups": [ + ":testgroupViewers" + ], + "projects": [ + ":member" + ] + }, + { + "username": "testerProjectAdmin", + "email": "tester.projectadmin@test.org", + "givenName": "Tester", + "familyName": "Project Admin", + "password": "test0815", + "lang": "en", + "projects": [ + ":admin" + ] } ], "ontologies": [ @@ -529,6 +573,65 @@ ] } ] + }, + { + "name": "testontoPermissions", + "label": "Test ontology permissions", + "properties": [ + { + "name": "hasText", + "super": [ + "hasValue" + ], + "object": "TextValue", + "labels": { + "en": "Text" + }, + "gui_element": "SimpleText", + "gui_attributes": { + "maxlength": 255, + "size": 80 + } + } + ], + "resources": [ + { + "name": "TestThingPermissions", + "super": "Resource", + "labels": { + "en": "Test thing for permissions without object" + }, + "comments": { + "en": "A thing to test permission related things", + "de": "Ein Ding um Permissionsdinge zu testen." + }, + "cardinalities": [ + { + "propname": ":hasText", + "gui_order": 1, + "cardinality": "0-n" + } + ] + }, + { + "name": "ImageThingPermissions", + "super": "StillImageRepresentation", + "labels": { + "en": "Test thing for permissions with object" + }, + "comments": { + "en": "A thing to test permission related things", + "de": "Ein Ding um Permissionsdinge zu testen." + }, + "cardinalities": [ + { + "propname": ":hasText", + "gui_order": 1, + "cardinality": "0-n" + } + ] + } + ] } ] } diff --git a/testdata/test-permissions.xml b/testdata/test-permissions.xml new file mode 100644 index 000000000..da424b9ea --- /dev/null +++ b/testdata/test-permissions.xml @@ -0,0 +1,158 @@ + + + + + + + RV + V + CR + CR + M + V + + + V + CR + CR + M + V + + + + V + V + CR + CR + M + V + + + V + CR + CR + M + V + + + + V + V + CR + CR + M + V + + + RV + RV + CR + CR + M + V + + + + + Dies ist ein einzelnes Textfeld (prop-restricted) + + + + + + Dies ist ein Textfeld (prop-restricted) + Dies ist ein Textfeld (prop-default) + + + + + + Dies ist ein Textfeld (prop-restricted) + Dies ist ein Textfeld (prop-default) + + + + + + Dies ist ein einzelnes Textfeld (no permissions) + + + + + TEMP11.TIF + + This is an ImageThing (prop-restricted) + + + + + TEMP11.TIF + + This is an ImageThing (prop-default) + + + + + TEMP11.TIF + + This is an ImageThing (prop-default) + This is an ImageThing (prop-restricted) + + + + + TEMP11.TIF + + This is an ImageThing (prop-default) + This is an ImageThing (prop-restricted) + + + + + TEMP11.TIF + + This is an ImageThing (prop-default) + This is an ImageThing (prop-restricted) + + + + + TEMP11.TIF + + This is an ImageThing (no permissions) + + + + diff --git a/testdata/tmp/_test-id2iri-replaced.xml b/testdata/tmp/_test-id2iri-replaced.xml index bf791dfdf..666a04caa 100644 --- a/testdata/tmp/_test-id2iri-replaced.xml +++ b/testdata/tmp/_test-id2iri-replaced.xml @@ -1,6 +1,6 @@ - + V V