diff --git a/knora/README.md b/knora/README.md deleted file mode 100644 index b223b5a9d..000000000 --- a/knora/README.md +++ /dev/null @@ -1,683 +0,0 @@ -# Knora Library - -This library offers classes and methods to manipulate a Knora based repository (DSP). Most importantly, it allows -importing data by providing methods to create new ontologies and resources. - -## Contents - -- [Basic methods](#basic-methods) - - [Knora()](#knora) - - [login()](#login) - - [logout](#logout) -- [Project methods()](#project-specific-methods) - - [get_existing_projects()](#get_existing_projects) - - [get_project()](#get_project) - - [project_exists()](#project_exists) - - [create_project()](#create_project) - - [update_project()](#update_project) - - [get_users()](#get_users) - - [create_user()](#create_user) - - [add_user_to_project()](#add_user_to_project) -- [Ontology methods](#ontology-methods) - - [get_existing_ontologies()](#get_existing_ontologies) - - [get_project_ontologies()](#get_project_ontologies) - - [ontology_exists()](#ontology_exists) - - [get_ontology_lastmoddate()](#get_ontology_lastmoddate) - - [create_ontology()](#create_ontology) - - [delete_ontology()](#delete_ontology) - - [get_ontology_graph()](#get_ontology_graph) - - [create_res_class()](#create_res_class) - - [create_property()](#create_property) - - [create_cardinality()](#create_cardinality) -- [Lists](#lists) - - [create_list_node()](#create_list_node) - - [get_lists()](#get_lists) - - [get_complete_list()](#get_complete_list) -- [Creating instances](#creating-instances) - - [create_resource()](#create_resource) - - [values parameter](#values-parameter) - - [Stillimage parameter](#stillimage-parameter) -- [Other methods](#other-methods) - - [create_schema()](#create_schema) - - [reset_triplestore_content()](#reset_triplestore_content) -- [SIPI](#sipi) - - [Sipi()](#Sipi) - - [upload_image()](#upload_image) -- [BulkImport](#bulkImport) - - [Bulkimport()](#bulkimport) - - [add_resource()](#add_resource) - - [upload()](#upload) - -## Knora - -Knora is the base class which handles all the communication with the DSP API. In order to successfully use the API, -credentials to the DSP API have to be submitted. - -### Basic methods - -#### Knora() - -Creates an access object for the DSP API. - -```python -Knora( - server: str, - prefixes: Dict[str, str] = None) -``` - -- server: URL of the DSP server -- prefixes: List of extra RDF prefixes that are being used - -Example: - -``` -con = Knora(args.server) -``` - -#### login() - -Logs in the user and stores the access token. - -```python -login(email: str, password: str) -``` - -- email: Email address of user -- password: Password of user - -Example: - -``` -con.login(args.user, args.password) -``` - -#### logout() - -Logs out the user and removes the access token. - -```python -logout() -``` - -Example: - -``` -con.logout() -``` - -### Project specific methods - -The following methods are used to add and modify projects and users. - -#### get_existing_projects() - -Returns a list of all projects. By default, only the project's IRI are returned. - -```python -get_existing_projects(full: bool = False) -``` - -- full: if False (default), only the project's IRI are returned, if True, more information about the projects are - returned - -#### get_project() - -Returns information about a project given its shortcode. - -```python -get_project(shortcode: str) -> dict -``` - -#### project_exists() - -Returns `True` if a project, given by its IRI, exists in the repository, `False` otherwise. - -```python -project_exists(proj_iri: str) -> bool -``` - -- proj_iri: IRI of the project - -#### create_project() - -This method is used to create a new project. - -```python -create_project( - shortcode: str, - shortname: str, - longname: str, - descriptions: Optional[Dict[str, str]] = None, - keywords: Optional[List[str]] = None, - logo: Optional[str] = None) -> str -``` - -- shortcode: The shortcode of the project -- shortname: The short name of the project -- longname: The full name of the project -- descriptions: [optional] Dict with the language code (e.g. "en", "de") as key and the actual description as value -- keywords: [optional] List of keywords describing the project -- logo: [optional] Path to logo file (the logo has to be uploaded before it can be referenced) - -#### update_project() - -Modifies existing project data. All parameters except the shortcode (which cannot be changed) are optional. - -```python -update_project( - shortcode: str, - shortname: str, - longname: str, - descriptions: Optional[Dict[str, str]] = None, - keywords: Optional[List[str]] = None, - logo: Optional[str] = None) -> str -``` - -- shortcode: The shortcode of the project -- shortname: The short name of the project -- longname: The full name of the project -- descriptions: [optional] Dict with the language code (e.g. "en", "de") as key and the actual description as value -- keywords: [optional] List of keywords describing the project -- logo: [optional] Path to logo file (the logo has to be uploaded before it can be referenced) - -#### get_users() - -Returns a list of all users. - -``` -get_users() -``` - -#### get_user() - -Returns information about a specific user given its IRI. - -```python -get_user(user_iri: str) -``` - -- user_iri: The IRI of the user - -#### create_user() - -Creates a new user and returns its IRI. - -```python -create_user( - username: str, - email: str, - givenName: str, - familyName: str, - password: str, - lang: str = "en") -``` - -- username: Username -- email: Email address of user – is used to identify the user -- givenName: First name of the user -- familyName: Family name of the user -- password: Password -- lang: [optional] Language code of default language - -#### add_user_to_project() - -Adds an existing user to a project. - -```python -add_user_to_project( - user_iri: str, - project_iri: str) -``` - -- user_iri: IRI of the user -- project_iri: IRI of the project the user should have access to - -### Ontology methods - -Since several instances could modify an ontology at the same time, a simple mechanism has been implemented to avoid race -conditions: -A modification of an ontology requires the timestamp of its last modification. If the ontology has been modified before -an update is submitted, the submitted modification timestamp will not fit and the modification is rejected. Every -manipulation of an ontology returns its new last modification timestamp. - -Many ontology methods need the ontology IRI *and* the ontology name. The reason for this is better performance. With the -IRI and the name, round trips to the DSP backend are avoided. - -#### get_existing_ontologies() - -Returns information about all existing ontologies. - -```python -get_existing_ontologies() -``` - -#### get_project_ontologies() - -Returns the ontologies of a project given the project's IRI. - -```python -get_project_ontologies(project_code: str) -> Optional[dict] -``` - -- project_code: Shortcode of project - -#### ontology_exists() - -Returns `True` if the ontology, given by its IRI, exists on the DSP. - -```python -ontology_exists(onto_iri: str) -``` - -- onto_iri: IRI of the ontology - -#### get_ontology_lastmoddate() - -Returns the last modification date of a given ontology. - -```python -get_ontology_lastmoddate(self, onto_iri: str) -``` - -- onto_iri: IRI of the ontology - -#### create_ontology() - -Creates a new ontology. - -```python -create_ontology( - onto_name: str, - project_iri: str, - label: str -) -> Dict[str, str] -``` - -- onto_name: Name of the ontology -- project_iri: IRI of the project the ontology belongs to -- returns: Dict with the ontologies IRI and a timestamp of its last modification - -#### delete_ontology() - -Deletes the given ontology. - -```python -delete_ontology( - onto_iri: str - last_onto_date = None) -``` - -- onto_iri: IRI of the ontology -- last_onto_date: Timestamp of the last modification of the ontology. It cannot be deleted if the given timestamp - doesn't fit. - -#### get_ontology_graph() - -Returns a given ontology as RDF (text/turtle). - -```python -get_ontology_graph( - shortcode: str, - name: str) -``` - -- shortcode: Shortcode of the project -- name: Name of the ontology - -#### create_res_class() - -Creates a new resource class in the ontology. - -```python -create_res_class( - onto_iri: str, - onto_name: str, - last_onto_date: str, - class_name: str, - super_class: List[str], - labels: Dict[str, str], - comments: Optional[Dict[str, str]] = None -) -> Dict[str, str] -``` - -- onto_iri: IRI of the ontology the new resource class should be added to -- onto_name: Name of the ontology -- last_onto_date: Timestamp of the ontology's last modification -- class_name: Name of the new class -- super_class: A list of super classes this class is derived from. Usually, this is just "Resource" for "knora-api: - Resource" or another resource class including the prefix. -- labels: Dict with the language specific labels, e.g. `{"en": "English label", "de": "German label"}` -- comments: [optional] Dict with language specific comments about the resource class - -#### create_property() - -Create a new property. - -```python -create_property( - onto_iri: str, - onto_name: str, - last_onto_date: str, - prop_name: str, - super_props: List[str], - labels: Dict[str, str], - gui_element: str, - gui_attributes: List[str] = None, - subject: Optional[str] = None, - object: Optional[str] = None, - comments: Optional[Dict[str, str]] = None -) -> Dict[str, str] -``` - -- onto_iri: IRI of the ontology the new resource class should be added to -- onto_name: Name of the ontology -- last_onto_date: Timestamp of the ontology's last modification -- prop_name: Name of the property to be created -- super_props: List of super-properties, must be at least `[knora-api:hasValue]`. But it is possible to declare a - property to be a descendant from another property from another ontology or from an external ontology ( - e.g. `foaf:familyName`). Please note that all used prefixes have to be declared at the construction of the Knora - instance. -- labels: Dict with the language specific labels, e.g. `{"en": "English label", "de": "German label"}` -- gui_element: GUI element that should be used. Supported GUI elements are: - - salsah-gui:Colorpicker (attributes: ncolors='integer') - - salsah-gui:Date - - salsah-gui:Geometry - - salsah-gui:Geonames - - salsah-gui:Interval - - salsah-gui:List (attributes [required]: hlist='iri') - - salsah-gui:Pulldown (attributes [required]: hlist='iri') - - salsah-gui:Radio (attributes [required]: hlist='iri') - - salsah-gui:Richtext - - salsah-gui:Searchbox (attributes: numprops='integer') - - salsah-gui:SimpleText (attributes: maxlength='integer', size='integer') - - salsah-gui:Slider (attributes [required]: max='decimal', min='decimal') - - salsah-gui:Spinbox (attributes: max='decimal', min='decimal') - - salsah-gui:Textarea (attributes: cols='integer', rows='integer', width='percent'%, wrap="soft"|"hard") - - salsah-gui:Checkbox - - salsah-gui:Fileupload -- gui_attributes: List of Gui attributes -- subject: [optional] The resource the property belongs to. If a resource is used by several resources, this field can - be left empty. -- object: The value type the property points to. The following object types are supported by DSP: - - salsah-gui:TextValue (gui_element: salsah-gui:SimpleText, salsah-gui:TextArea) A simple text value - - salsah-gui:ColorValue (gui_element: salsah-gui:Colorpicker) A web color - - salsah-gui:DateValue (gui_elment: Date) A Knora calendar date - - salsah_gui:DecimalValue (gui_element: salsah-gui:SimpleText) A Decimal value - - salsah_gui:GeomValue (gui_element: salsah-gui:Geometry) A geometry object (region in an image) - - salsah_gui:GeonameValue (gui_element: salsah-gui:Geonames) A geographical location identified by a geonames.org - code - - salsah_gui:IntValue (gui_element: salsah-gui:SimpleText, salsah-gui:Slider, salsah-gui:Spinbox) An integer value - - salsah_gui:BooleanValue (gui_element: salsah-gui:Checkbox) A boolean value - - salsah_gui:UriValue (gui_element: salsah-gui:SimpleText) A URI - - salsah_gui:IntervalValue (gui_element: No Yet Implemented) An interval or time span - - salsah_gui:ListValue (gui_element: salsah-gui:Pulldown, salsah-gui:Radio, salsah-gui:List) An entry from a list -- comments: [optional] Dict with language specifics comments about the resource class - -#### create_cardinality() - -Associates a property with a given resource and indicates the cardinality restriction. - -```python -create_cardinality( - onto_iri: str, - onto_name: str, - last_onto_date: str, - class_iri: str, - prop_iri: str, - occurrence: str -) -> Dict[str, str] -``` - -- onto_iri: IRI of the ontology the new resource class should be added to -- onto_name: Name of the ontology -- last_onto_date: Timestamp of the ontology's last modification -- class_iri: IRI of the resource class the property should be associated with -- prop_iri: IRI of the property to be associated with the resource class -- occurrence: must be one of: "1", "0-1", "0-n" or "1-n" (as string) - -### Lists - -Lists (flat or hierarchical) are ordered named nodes that can be used as controlled vocabularies, selections or -hierarchical thesauri. A list has a root node where the list's name is defined. Subnodes represent the list elements. In -case of hierarchical lists, a subnode may hold further subnodes (recursively). Lists are always associated with a -project. - -#### create_list_node() - -Creates a list node (root or subnode, depending on parameters). - -```python -create_list_node( - project_iri: str, - labels: Dict[str, str], - comments: Optional[Dict[str, str]] = None, - name: Optional[str] = None, - parent_iri: Optional[str] = None -) -> str -``` - -- project_iri: The IRI of the project the list is associated with -- labels: Language dependent labels, e.g. `{"en": "yes", "de": "nein"}` -- comments: Language dependent comments to the node -- parent_iri: If `None`, it is a root node, otherwise the IRI of the parent node has to be provided (this could also be - the root node) - -#### get_lists() - -Returns information about the lists of a given project. - -```python -get_lists( - shortcode: str -) -> Dict -``` - -- shortcode: Shortcode of the project. - -#### get_complete_list() - -Returns all nodes of a specific list. - -```python -get_complete_list( - list_iri: str -) -> Dict -``` - -- list_iri: IRI of the list - -### Creating instances - -In order to fill the repository with data, resource instances have to be created. - -#### create_resource() - -Creates a new instance of a given resource. All parameters are required. - -```python -create_resource( - schema: Dict, - res_class: str, - label: str, - values: Dict, - stillimage = None -) -> Dict -``` - -- schema: The schema is a Dict that contains information about the ontology. It will be created by calling the method - `create_schema()`, e.g. `schema = con.create_schema(args.projectcode, args.ontoname)`. It is used to validate the data - supplied to `create_resource()` for consistency with the ontology. -- res_class: IRI of the resource class that should be instantiated -- label: A string describing the instance -- values: A Dict describing all the values. See [example of values parameter](#values-parameter). -- stillimage: In case the resource is a descendant of `StillImageRepresentation`, this parameter contains the file path. - See - [example of still image parameter](#still-image-parameter). - -##### Values parameter - -The values are a Dict in the form: - -```python -{ - "propertyname": value, - "propertyname": value, - ... -} -``` - -If the property belongs to the same ontology as the resource, the prefix should be omitted. The value can either be a -string or a Dict of the following form: - -```python -{ - "value": "the value", - "comment": "a comment to the value" -} -``` - -_Note_: If the value is text with markup (only standard mapping allowed), the value must be an instance -of `KnoraStandoffXml`, e.g.: - -```python - ... -"richtextprop": { - 'value': KnoraStandoffXml( - "\n

this is text

with standoff
"), - 'comment': "Text with Markup" -}, -... -``` - -##### Still image parameter: - -A still image has to be uploaded to DSP before its `StillImageRepresentation` instance can be created: - -```python -sipi = Sipi(args.sipi, con.get_token()) -img = sipi.upload_image("example.tif") -... -an_image = con.create_resource(schema, - "object2", - { - "titleprop": "Stained glass", - "linkprop": inst1_info['iri'] - }, - img['uploadedFiles'][0]['internalFilename'] - ) -``` - -### Other methods - -#### create_schema() - -Gets the ontology information from DSP API and converts it into a python Dict that can be used for further processing. -It is used by the bulk import and the `create_resource()` method. - -```python -create_schema( - shortcode: str, - shortname: str -) -> Dict -``` - -- shortcode: Shortcode of the ontology -- shortname: Short name of the ontology - -#### reset_triplestore_content() - -Empties the triple store. _Use with uttermost caution._ Used on (local) test instances to clean up the (local) backend. - -## SIPI - -Sipi is the IIIF conformant image backend. - -### Methods of Sipi class - -#### Sipi() - -Creates a SIPI instance. - -```python -Sipi( - sipiserver: str, - token: str -) -``` - -- sipiserver: URL of the SIPI server/endpoint -- token: Access token from DSP API - -Example: - -```python -sipi = Sipi(args.sipi, con.get_token()) -``` - -#### upload_image() - -Uploads an image to the SIPI server. - -``` -upload_image( - filepath: str -) -> Dict -``` - -- filepath: Path to the file on the local filesystem. J2K, TIF, JPEG and PNG images are supported. - -## Bulk import - -The bulk import functionality is used to import multiple resources at once. - -### BulkImport - -Handles the bulk import. - -#### Bulkimport() - -Creates a Bulkimport instance - -```python -Bulkimport( - schema: Dict -) -> Bulkimport -``` - -- schema: See `create_schema()` - -#### add_resource() - -Adds an instance to the bulk import. - -```python -add_resource( - resclass: str, - id: str, - label: str, - properties: Dict -) -``` - -- resclass: Name of the resource class (prefix may be omitted) -- id: A user defined (internal) ID that can be used to reference this resource later during the bulk import -- label: String describing the instance -- properties: Dict of property name/value pairs - -#### upload() - -Uploads the data using the bulk import route of the DSP API. - -```python -upload( - user: str, - password: str, - hostname: str, - port: int -) -``` - -- user: Username -- password: Password -- hostname: Hostname of the DSP backend -- port: Port number the DSP backend uses diff --git a/knora/dsp_tools.py b/knora/dsp_tools.py index 6fe6dfcc0..ee4dd407d 100644 --- a/knora/dsp_tools.py +++ b/knora/dsp_tools.py @@ -163,7 +163,7 @@ def program(user_args: list[str]) -> None: create_ontology(input_file=args.datamodelfile, lists_file=args.listfile, server=args.server, - user=args.user, + user_mail=args.user, password=args.password, verbose=args.verbose, dump=args.dump if args.dump else False) diff --git a/knora/dsplib/models/bitstream.py b/knora/dsplib/models/bitstream.py index 22b95027f..ff515abaf 100644 --- a/knora/dsplib/models/bitstream.py +++ b/knora/dsplib/models/bitstream.py @@ -1,4 +1,4 @@ -from typing import Dict, Optional, Any +from typing import Optional, Any from pystrict import strict @@ -28,7 +28,7 @@ def value(self) -> str: def permissions(self) -> Optional[Permissions]: return self._permissions - def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: + def toJsonLdObj(self, action: Actions) -> dict[str, Any]: tmp = {} if action == Actions.Create: tmp["knora-api:fileValueHasFilename"] = self._value diff --git a/knora/dsplib/models/connection.py b/knora/dsplib/models/connection.py index 8babf45cd..5c74c68bb 100644 --- a/knora/dsplib/models/connection.py +++ b/knora/dsplib/models/connection.py @@ -1,5 +1,5 @@ import json -from typing import Dict, Optional, Union +from typing import Optional, Union import requests from pystrict import strict @@ -19,11 +19,11 @@ class Connection: """ _server: str - _prefixes: Union[Dict[str, str], None] + _prefixes: Union[dict[str, str], None] _token: Union[str, None] _log: bool - def __init__(self, server: str, prefixes: Dict[str, str] = None): + def __init__(self, server: str, prefixes: dict[str, str] = None): """ Constructor requiring the server address, the user and password of KNORA :param server: Address of the server, e.g https://api.dasch.swiss @@ -153,12 +153,12 @@ def post(self, path: str, jsondata: Optional[str] = None): tmp = path.split('/') filename = "POST" + "_".join(tmp) + ".json" with open(filename, 'w') as f: - json.dump(logobj, f, indent=3) + json.dump(logobj, f, indent=4) self.on_api_error(req) result = req.json() return result - def get(self, path: str, headers: Optional[Dict[str, str]] = None): + def get(self, path: str, headers: Optional[dict[str, str]] = None): """ Get data from a server using a HTTP GET request :param path: Path of RESTful route diff --git a/knora/dsplib/models/group.py b/knora/dsplib/models/group.py index 7209dc317..5887461d8 100644 --- a/knora/dsplib/models/group.py +++ b/knora/dsplib/models/group.py @@ -1,10 +1,12 @@ +from __future__ import annotations + import json -from typing import List, Optional, Any, Union +from typing import Optional, Any, Union from urllib.parse import quote_plus from pystrict import strict -from knora.dsplib.models.langstring import LangString, Languages +from knora.dsplib.models.langstring import LangString from .connection import Connection from .helpers import Actions, BaseError from .model import Model @@ -211,7 +213,7 @@ def toJsonObj(self, action: Actions): tmp['selfjoin'] = self._selfjoin return tmp - def create(self): + def create(self) -> Group: jsonobj = self.toJsonObj(Actions.Create) jsondata = json.dumps(jsonobj) result = self._con.post(Group.ROUTE, jsondata) @@ -238,14 +240,16 @@ def delete(self): return Group.fromJsonObj(self._con, result['group']) @staticmethod - def getAllGroups(con: Connection) -> List['Group']: - result = con.get(Group.ROUTE) - if 'groups' not in result: - raise BaseError("Request got no groups!") - return list(map(lambda a: Group.fromJsonObj(con, a), result['groups'])) + def getAllGroups(con: Connection) -> Optional[list[Group]]: + try: + result = con.get(Group.ROUTE) + return [Group.fromJsonObj(con, group_item) for group_item in result["groups"]] + except BaseError: + # return None if no groups are found or an error happened + return None @staticmethod - def getAllGroupsForProject(con: Connection, proj_shortcode: str) -> List['Group']: + def getAllGroupsForProject(con: Connection, proj_shortcode: str) -> list[Group]: result = con.get(Group.ROUTE) if 'groups' not in result: raise BaseError("Request got no groups!") @@ -265,7 +269,7 @@ def createDefinitionFileObj(self): } return group - def print(self): + def print(self) -> None: print('Group Info:') print(' Id: {}'.format(self._id)) print(' Name: {}'.format(self._name)) diff --git a/knora/dsplib/models/helpers.py b/knora/dsplib/models/helpers.py index 615f0bdca..25b7a4acd 100644 --- a/knora/dsplib/models/helpers.py +++ b/knora/dsplib/models/helpers.py @@ -3,7 +3,7 @@ from dataclasses import dataclass from enum import Enum, unique from traceback import format_exc -from typing import NewType, List, Dict, Optional, Any, Tuple, Union, Pattern +from typing import NewType, Optional, Any, Tuple, Union, Pattern from pystrict import strict @@ -23,7 +23,7 @@ class OntoInfo: hashtag: bool -ContextType = NewType("ContextType", Dict[str, OntoInfo]) +ContextType = NewType("ContextType", dict[str, OntoInfo]) def LINE() -> int: @@ -86,7 +86,7 @@ class Cardinality(Enum): @strict class ContextIterator: _context: 'Context' - _prefixes: List[str] + _prefixes: list[str] _index: int def __init__(self, context: 'Context'): @@ -111,10 +111,10 @@ class Context: This class holds a JSON-LD context with the ontology IRI's and the associated prefixes """ _context: ContextType - _rcontext: Dict[str, str] + _rcontext: dict[str, str] _exp: Pattern[str] - common_ontologies = ContextType({ + common_ontologies = ContextType({ "foaf": OntoInfo("http://xmlns.com/foaf/0.1/", False), "dc": OntoInfo("http://purl.org/dc/elements/1.1/", False), "dcterms": OntoInfo("http://purl.org/dc/terms/", False), @@ -148,7 +148,7 @@ def __is_iri(self, val: str) -> bool: m = self._exp.match(val) return m.span()[1] == len(val) if m else False - def __init__(self, context: Optional[Dict[str, str]] = None): + def __init__(self, context: Optional[dict[str, str]] = None): """ THe Constructor of the Context. It takes one optional parameter which as a dict of prefix - ontology-iri pairs. If the hashtag "#" is used to append element name, the @@ -158,17 +158,17 @@ def __init__(self, context: Optional[Dict[str, str]] = None): # regexp to test for a complete IRI (including fragment identifier) self._exp = re.compile("^(http)s?://([\\w\\.\\-~]+)?(:\\d{,6})?(/[\\w\\-~]+)*(#[\\w\\-~]*)?") self._context = ContextType({}) - + # add ontologies from context, if any if context: for prefix, onto in context.items(): self._context[prefix] = OntoInfo(onto.removesuffix('#'), onto.endswith('#')) - + # add standard ontologies (rdf, rdfs, owl, xsl) for k, v in self.base_ontologies.items(): if not self._context.get(k): self._context[k] = v - + # add DSP-API internal ontologies (knora-api, salsah-gui) for k, v in self.knora_ontologies.items(): if not self._context.get(k): @@ -224,7 +224,7 @@ def context(self, value: ContextType) -> None: raise BaseError("Error in parameter to context setter") @property - def rcontext(self) -> Dict[str, str]: + def rcontext(self) -> dict[str, str]: return self._rcontext def add_context(self, prefix: str, iri: Optional[str] = None) -> None: @@ -400,7 +400,7 @@ def reduce_iri(self, iristr: str, ontoname: Optional[str] = None) -> str: else: return iristr - def toJsonObj(self) -> Dict[str, str]: + def toJsonObj(self) -> dict[str, str]: """ Return a python object that can be jsonfied... :return: Object to be jsonfied @@ -408,7 +408,7 @@ def toJsonObj(self) -> Dict[str, str]: return {prefix: oinfo.iri + '#' if oinfo.hashtag else oinfo.iri for prefix, oinfo in self._context.items()} - def get_externals_used(self) -> Dict[str, str]: + def get_externals_used(self) -> dict[str, str]: exclude = ["rdf", "rdfs", "owl", "xsd", "knora-api", "salsah-gui"] return {prefix: onto.iri for prefix, onto in self._context.items() if prefix not in exclude} @@ -487,7 +487,7 @@ class WithId: """ _tmp: str = None - def __init__(self, obj: Optional[Dict[str, str]]): + def __init__(self, obj: Optional[dict[str, str]]): if obj is None: return self._tmp = obj.get('@id') diff --git a/knora/dsplib/models/langstring.py b/knora/dsplib/models/langstring.py index c5bcdaada..72403d830 100644 --- a/knora/dsplib/models/langstring.py +++ b/knora/dsplib/models/langstring.py @@ -1,5 +1,5 @@ from enum import Enum, unique -from typing import List, Dict, Tuple, Optional, Any, Union +from typing import Tuple, Optional, Any, Union from ..models.helpers import BaseError @@ -12,7 +12,7 @@ class Languages(Enum): IT = 'it' -LangStringParam = Optional[Union[Dict[Union[Languages, str], str], str]] +LangStringParam = Optional[Union[dict[Union[Languages, str], str], str]] class LangStringIterator: @@ -53,7 +53,7 @@ class LangString: "some:thing": "a string without language specificer" ``` """ - _langstrs: Dict[Languages, str] + _langstrs: dict[Languages, str] _simplestring: str def __init__(self, initvalue: LangStringParam = None): @@ -204,7 +204,7 @@ def toJsonLdObj(self): # return list(map(lambda a: {'@language': a[0].value, '@value': a[1]}, self._langstrs.items())) @classmethod - def fromJsonLdObj(cls, obj: Optional[Union[List[Dict[str, str]], str]]) -> 'LangString': + def fromJsonLdObj(cls, obj: Optional[Union[list[dict[str, str]], str]]) -> 'LangString': if obj is None: return None if isinstance(obj, str): @@ -213,7 +213,7 @@ def fromJsonLdObj(cls, obj: Optional[Union[List[Dict[str, str]], str]]) -> 'Lang objs = obj else: objs = [obj] - lstrs: Dict[Languages, str] = {} + lstrs: dict[Languages, str] = {} for o in objs: lang = o.get('@language') if lang == 'en': @@ -239,7 +239,7 @@ def fromJsonObj(cls, obj: Optional[Any]) -> 'LangString': objs = obj else: objs = [obj] - lstrs: Dict[Languages, str] = {} + lstrs: dict[Languages, str] = {} for o in objs: lang = o.get('language') if lang == 'en': @@ -268,7 +268,7 @@ def print(self, offset: Optional[int] = None): def langstrs(self): return self._langstrs - def createDefinitionFileObj(self) -> Union[str, Dict[str, str]]: + def createDefinitionFileObj(self) -> Union[str, dict[str, str]]: if self._simplestring: return self._simplestring langstring = {} diff --git a/knora/dsplib/models/listnode.py b/knora/dsplib/models/listnode.py index 5b203db26..97a052c3c 100644 --- a/knora/dsplib/models/listnode.py +++ b/knora/dsplib/models/listnode.py @@ -38,9 +38,8 @@ def list_creator(con: Connection, project: Project, parent_node: 'ListNode', - nodes: List[dict]) -> List['ListNode']: - - nodelist: List[ListNode] = [] + nodes: list[dict]) -> list['ListNode']: + nodelist: list[ListNode] = [] for node in nodes: new_node = ListNode( @@ -96,7 +95,7 @@ class ListNode(Model): isRootNode : bool Is True if the ListNode is the root node of a list. Cannot be set [read]. - children : List[ListNode] + children : list[ListNode] Contains a list of child nodes. This attribute is only available for nodes that have been read by the method "getAllNodes()" [read]. @@ -141,7 +140,7 @@ class ListNode(Model): _name: Optional[str] _parent: Optional[str] _isRootNode: bool - _children: Optional[List['ListNode']] + _children: Optional[list['ListNode']] _rootNodeIri: Optional[str] def __init__(self, @@ -153,7 +152,7 @@ def __init__(self, name: Optional[str] = None, parent: Optional[Union['ListNode', str]] = None, isRootNode: bool = False, - children: Optional[List['ListNode']] = None, + children: Optional[list['ListNode']] = None, rootNodeIri: Optional[str] = None): """ This is the constructor for the ListNode object. For @@ -312,18 +311,18 @@ def isRootNode(self, value: bool) -> None: raise BaseError('Property isRootNode cannot be set!') @property - def children(self) -> Optional[List['ListNode']]: + def children(self) -> Optional[list['ListNode']]: return self._children @children.setter - def children(self, value: List['ListNode']) -> None: + def children(self, value: list['ListNode']) -> None: self._children = value @staticmethod def __getChildren(con: Connection, parent_iri: str, project_iri: str, - children: List[Any]) -> Optional[List['ListNode']]: + children: list[Any]) -> Optional[list['ListNode']]: """ Internal method! Should not be used directly! @@ -334,7 +333,7 @@ def __getChildren(con: Connection, :return: List of ListNode instances """ if children: - child_nodes: List[Any] = [] + child_nodes: list[Any] = [] for child in children: if 'parentNodeIri' not in child: @@ -530,7 +529,7 @@ def getAllNodes(self) -> 'ListNode': return root @staticmethod - def getAllLists(con: Connection, project_iri: Optional[str] = None) -> List['ListNode']: + def getAllLists(con: Connection, project_iri: Optional[str] = None) -> list['ListNode']: """ Get all lists. If a project IRI is given, it returns the lists of the specified project @@ -546,7 +545,7 @@ def getAllLists(con: Connection, project_iri: Optional[str] = None) -> List['Lis raise BaseError("Request got no lists!") return list(map(lambda a: ListNode.fromJsonObj(con, a), result['lists'])) - def _createDefinitionFileObj(self, children: List["ListNode"]): + def _createDefinitionFileObj(self, children: list["ListNode"]): """ Create an object that corresponds to the syntax of the input to "create_onto". Node: This method must be used only internally (for recursion)!! diff --git a/knora/dsplib/models/model.py b/knora/dsplib/models/model.py index 8521d34b8..186df4de8 100644 --- a/knora/dsplib/models/model.py +++ b/knora/dsplib/models/model.py @@ -1,12 +1,10 @@ -from typing import Set - from .connection import Connection from .helpers import BaseError class Model: _con: Connection - _changed: Set[str] + _changed: set[str] def __init__(self, con: Connection): if not isinstance(con, Connection): diff --git a/knora/dsplib/models/ontology.py b/knora/dsplib/models/ontology.py index a24d4c3fd..811bfac03 100644 --- a/knora/dsplib/models/ontology.py +++ b/knora/dsplib/models/ontology.py @@ -1,6 +1,6 @@ import copy import json -from typing import List, Tuple, Optional, Any, Union +from typing import Tuple, Optional, Any, Union from urllib.parse import quote_plus from pystrict import strict @@ -54,7 +54,6 @@ def default(self, obj): @strict class Ontology(Model): - ROUTE: str = '/v2/ontologies' METADATA: str = '/metadata/' ALL_LANGUAGES: str = '?allLanguages=true' @@ -65,10 +64,10 @@ class Ontology(Model): _label: str _comment: str _lastModificationDate: LastModificationDate - _resource_classes: List[ResourceClass] - _property_classes: List[PropertyClass] + _resource_classes: list[ResourceClass] + _property_classes: list[PropertyClass] _context: Context - _skiplist: List[str] + _skiplist: list[str] def __init__(self, con: Connection, @@ -78,8 +77,8 @@ def __init__(self, label: Optional[str] = None, comment: Optional[str] = None, lastModificationDate: Optional[Union[str, LastModificationDate]] = None, - resource_classes: List[ResourceClass] = [], - property_classes: List[PropertyClass] = [], + resource_classes: list[ResourceClass] = [], + property_classes: list[PropertyClass] = [], context: Context = None): super().__init__(con) self._id = id @@ -152,11 +151,11 @@ def lastModificationDate(self, value: Union[str, LastModificationDate]): self._lastModificationDate = LastModificationDate(value) @property - def resource_classes(self) -> List[ResourceClass]: + def resource_classes(self) -> list[ResourceClass]: return self._resource_classes @resource_classes.setter - def resource_classes(self, value: List[ResourceClass]) -> None: + def resource_classes(self, value: list[ResourceClass]) -> None: self._resource_classes = value def addResourceClass(self, resourceclass: ResourceClass, create: bool = False) -> Tuple[int, ResourceClass]: @@ -181,11 +180,11 @@ def removeResourceClass(self, index: int, erase: bool = False) -> None: del self._resource_classes[index] @property - def property_classes(self) -> List[PropertyClass]: + def property_classes(self) -> list[PropertyClass]: return self._property_classes @property_classes.setter - def property_classes(self, value: List[PropertyClass]): + def property_classes(self, value: list[PropertyClass]): self._property_classes = value def addPropertyClass(self, propclass: PropertyClass, create: bool = False) -> Tuple[int, ResourceClass]: @@ -323,7 +322,7 @@ def __oneOntologiesFromJsonObj(cls, con: Connection, json_obj: Any, context: Con context=context2) @classmethod - def allOntologiesFromJsonObj(cls, con: Connection, json_obj: Any) -> List['Ontology']: + def allOntologiesFromJsonObj(cls, con: Connection, json_obj: Any) -> list['Ontology']: context = Context(json_obj.get('@context')) rdf = context.prefix_from_iri("http://www.w3.org/1999/02/22-rdf-syntax-ns#") rdfs = context.prefix_from_iri("http://www.w3.org/2000/01/rdf-schema#") @@ -331,7 +330,7 @@ def allOntologiesFromJsonObj(cls, con: Connection, json_obj: Any) -> List['Ontol xsd = context.prefix_from_iri("http://www.w3.org/2001/XMLSchema#") knora_api = context.prefix_from_iri("http://api.knora.org/ontology/knora-api/v2#") salsah_gui = context.prefix_from_iri("http://api.knora.org/ontology/salsah-gui/v2#") - ontos: List['Ontology'] = [] + ontos: list['Ontology'] = [] if json_obj.get('@graph') is not None: for o in json_obj['@graph']: ontos.append(Ontology.__oneOntologiesFromJsonObj(con, o, context)) @@ -404,12 +403,12 @@ def delete(self) -> Optional[str]: return result.get('knora-api:result') @staticmethod - def getAllOntologies(con: Connection) -> List['Ontology']: + def getAllOntologies(con: Connection) -> list['Ontology']: result = con.get(Ontology.ROUTE + Ontology.METADATA) return Ontology.allOntologiesFromJsonObj(con, result) @staticmethod - def getProjectOntologies(con: Connection, project_id: str) -> List['Ontology']: + def getProjectOntologies(con: Connection, project_id: str) -> list['Ontology']: if project_id is None: raise BaseError('Project ID must be defined!') result = con.get(Ontology.ROUTE + Ontology.METADATA + quote_plus(project_id) + Ontology.ALL_LANGUAGES) diff --git a/knora/dsplib/models/permission.py b/knora/dsplib/models/permission.py index f40fbabfd..4a0218f9c 100644 --- a/knora/dsplib/models/permission.py +++ b/knora/dsplib/models/permission.py @@ -1,6 +1,6 @@ import re from enum import Enum, unique -from typing import List, Dict, Optional, Union +from typing import Optional, Union from pystrict import strict @@ -27,7 +27,7 @@ def __str__(self): @strict class PermissionsIterator: _permissions: 'Permissions' - _group: List[str] + _group: list[str] _index: int def __init__(self, permissions: 'Permissions'): @@ -47,19 +47,19 @@ def __next__(self): @strict class Permissions: - _permissions: Union[Dict[PermissionValue, List[str]], None] + _permissions: Union[dict[PermissionValue, list[str]], None] def __init__(self, - permissions: Optional[Dict[PermissionValue, List[str]]] = None): + permissions: Optional[dict[PermissionValue, list[str]]] = None): if permissions is None: self._permissions = {} else: self._permissions = permissions - def __getitem__(self, key: PermissionValue) -> Union[List[str], None]: + def __getitem__(self, key: PermissionValue) -> Union[list[str], None]: return self._permissions.get(key) - def __setitem__(self, key: PermissionValue, value: List[str]) -> None: + def __setitem__(self, key: PermissionValue, value: list[str]) -> None: self._permissions[key] = value def __delitem__(self, key: PermissionValue) -> None: @@ -99,12 +99,12 @@ def toJsonLdObj(self): @classmethod def fromString(cls, permstr: str): tmpstr = permstr.split('|') - permissions: Dict[PermissionValue, List[str]] = {} + permissions: dict[PermissionValue, list[str]] = {} for s in tmpstr: key, *vals = re.split("[\\s,]+", s) permissions[PermissionValue[key]] = vals return cls(permissions) @property - def permissions(self) -> Union[Dict[PermissionValue, List[str]], None]: + def permissions(self) -> Union[dict[PermissionValue, list[str]], None]: return self._permissions diff --git a/knora/dsplib/models/project.py b/knora/dsplib/models/project.py index 81a335fdc..501317f04 100644 --- a/knora/dsplib/models/project.py +++ b/knora/dsplib/models/project.py @@ -1,6 +1,8 @@ +from __future__ import annotations + import json from pprint import pprint -from typing import List, Set, Optional, Any, Union +from typing import Optional, Any, Union from urllib.parse import quote_plus from pystrict import strict @@ -8,10 +10,9 @@ from knora.dsplib.utils.set_encoder import SetEncoder from .connection import Connection from .helpers import Actions, BaseError -from .langstring import Languages, LangStringParam, LangString +from .langstring import Languages, LangString from .model import Model - """ This module implements the handling (CRUD) of Knora projects. @@ -64,11 +65,11 @@ class Project(Model): Knora project description in a given language (Languages.EN, Languages.DE, Languages.FR, Languages.IT). A desciption can be add/replaced or removed with the methods ``addDescription``and ``rmDescription``. - keywords : Set[str] + keywords : set[str] Set of keywords describing the project. Keywords can be added/removed by the methods ``addKeyword`` and ``rmKeyword`` - ontologies : Set[str] + ontologies : set[str] Set if IRI's of the ontologies attached to the project [readonly] selfjoin : bool @@ -106,8 +107,8 @@ class Project(Model): _shortname: str _longname: str _description: LangString - _keywords: Set[str] - _ontologies: Set[str] + _keywords: set[str] + _ontologies: set[str] _selfjoin: bool _status: bool _logo: Optional[str] @@ -121,8 +122,8 @@ def __init__(self, shortname: Optional[str] = None, longname: Optional[str] = None, description: LangString = None, - keywords: Optional[Set[str]] = None, - ontologies: Optional[Set[str]] = None, + keywords: Optional[set[str]] = None, + ontologies: Optional[set[str]] = None, selfjoin: Optional[bool] = None, status: Optional[bool] = None, logo: Optional[str] = None): @@ -231,11 +232,11 @@ def rmDescription(self, lang: Union[Languages, str]) -> None: self._changed.add('description') @property - def keywords(self) -> Set[str]: + def keywords(self) -> set[str]: return self._keywords @keywords.setter - def keywords(self, value: Union[List[str], Set[str]]): + def keywords(self, value: Union[list[str], set[str]]): if isinstance(value, set): self._keywords = value self._changed.add('keywords') @@ -272,11 +273,11 @@ def rmKeyword(self, value: str): self._changed.add('keywords') @property - def ontologies(self) -> Set[str]: + def ontologies(self) -> set[str]: return self._ontologies @ontologies.setter - def ontologies(self, value: Set[str]) -> None: + def ontologies(self, value: set[str]) -> None: raise BaseError('Cannot add a ontology!') @property @@ -310,7 +311,7 @@ def logo(self, value: str) -> None: self._changed.add('logo') @classmethod - def fromJsonObj(cls, con: Connection, json_obj: Any) -> Any: + def fromJsonObj(cls, con: Connection, json_obj: Any) -> Project: """ Internal method! Should not be used directly! @@ -358,7 +359,7 @@ def fromJsonObj(cls, con: Connection, json_obj: Any) -> Any: status=status, logo=logo) - def toJsonObj(self, action: Actions) -> Any: + def toJsonObj(self, action: Actions) -> dict[str, str]: """ Internal method! Should not be used directly! @@ -390,6 +391,7 @@ def toJsonObj(self, action: Actions) -> Any: if self._status is None: raise BaseError("status must be defined (True or False!") tmp['status'] = self._status + elif action == Actions.Update: if self._shortcode is not None and 'shortcode' in self._changed: tmp['shortcode'] = self._shortcode @@ -416,7 +418,7 @@ def createDefinitionFileObj(self): "keywords": [kw for kw in self._keywords] } - def create(self) -> 'Project': + def create(self) -> Project: """ Create a new project in Knora @@ -428,7 +430,7 @@ def create(self) -> 'Project': result = self._con.post(Project.ROUTE, jsondata) return Project.fromJsonObj(self._con, result['project']) - def read(self) -> 'Project': + def read(self) -> Project: """ Read a project from Knora @@ -446,22 +448,19 @@ def read(self) -> 'Project': else: return None # Todo: throw exception - def update(self) -> Union['Project', None]: + def update(self) -> Project: """ - Udate the project info in Knora with the modified data in this project instance + Update the project information on the DSP with the modified data in this project instance - :return: JSON-object from Knora refecting the update + Returns: JSON object returned as response from DSP reflecting the update """ jsonobj = self.toJsonObj(Actions.Update) - if jsonobj: - jsondata = json.dumps(jsonobj, cls=SetEncoder) - result = self._con.put(Project.IRI + quote_plus(self.id), jsondata) - return Project.fromJsonObj(self._con, result['project']) - else: - return None + jsondata = json.dumps(jsonobj, cls=SetEncoder) + result = self._con.put(Project.IRI + quote_plus(self.id), jsondata) + return Project.fromJsonObj(self._con, result['project']) - def delete(self) -> 'Project': + def delete(self) -> Project: """ Delete the given Knora project @@ -507,7 +506,7 @@ def set_default_permissions(self, group_id: str) -> None: pprint(result) @staticmethod - def getAllProjects(con: Connection) -> List['Project']: + def getAllProjects(con: Connection) -> list[Project]: """ Get all existing projects in Knora diff --git a/knora/dsplib/models/propertyclass.py b/knora/dsplib/models/propertyclass.py index e3120d0b2..a4ea2e1f9 100644 --- a/knora/dsplib/models/propertyclass.py +++ b/knora/dsplib/models/propertyclass.py @@ -1,6 +1,6 @@ import json import re -from typing import List, Dict, Tuple, Optional, Any, Union +from typing import Tuple, Optional, Any, Union from urllib.parse import quote_plus from pystrict import strict @@ -15,18 +15,17 @@ @strict class PropertyClass(Model): - ROUTE: str = "/v2/ontologies/properties" _context: Context _id: str _name: str _ontology_id: str - _superproperties: List[str] + _superproperties: list[str] _object: str _subject: str _gui_element: str - _gui_attributes: Dict[str, str] + _gui_attributes: dict[str, str] _label: LangString _comment: LangString _editable: bool @@ -38,11 +37,11 @@ def __init__(self, id: Optional[str] = None, name: Optional[str] = None, ontology_id: Optional[str] = None, - superproperties: Optional[List[Union['PropertyClass', str]]] = None, + superproperties: Optional[list[Union['PropertyClass', str]]] = None, object: Optional[str] = None, subject: Optional[str] = None, gui_element: Optional[str] = None, - gui_attributes: Optional[Dict[str, str]] = None, + gui_attributes: Optional[dict[str, str]] = None, label: Optional[Union[LangString, str]] = None, comment: Optional[Union[LangString, str]] = None, editable: Optional[bool] = None, @@ -118,7 +117,7 @@ def ontology_id(self, value: str) -> None: raise BaseError('"ontology_id" cannot be modified!') @property - def superproperties(self) -> Optional[List[str]]: + def superproperties(self) -> Optional[list[str]]: return self._superproperties @superproperties.setter @@ -151,11 +150,11 @@ def gui_element(self, value: str) -> None: self._changed.append('gui_element') @property - def gui_attributes(self) -> Optional[Dict[str, str]]: + def gui_attributes(self) -> Optional[dict[str, str]]: return self._gui_attributes @gui_attributes.setter - def gui_attributes(self, value: List[Dict[str, str]]) -> None: + def gui_attributes(self, value: list[dict[str, str]]) -> None: self._gui_attributes = value self._changed.append('gui_attributes') @@ -258,11 +257,11 @@ def fromJsonObj(cls, con: Connection, context: Context, json_obj: Any) -> Any: ontology_id = tmp_id[0] name = tmp_id[1] superproperties_obj = json_obj.get(rdfs + ':subPropertyOf') - superproperties: List[Union[None, str]] + superproperties: list[Union[None, str]] if not isinstance(superproperties_obj, list): superproperties_obj = [superproperties_obj] # make a list out of it if superproperties_obj is not None: - superprops: List[Any] = list(filter(lambda a: a.get('@id') is not None, superproperties_obj)) + superprops: list[Any] = list(filter(lambda a: a.get('@id') is not None, superproperties_obj)) superproperties = list(map(lambda a: a['@id'], superprops)) else: superproperties = None @@ -274,7 +273,7 @@ def fromJsonObj(cls, con: Connection, context: Context, json_obj: Any) -> Any: if json_obj.get(salsah_gui + ':guiElement') is not None: gui_element = WithId(json_obj.get(salsah_gui + ':guiElement')).str() gui_attributes_list = json_obj.get(salsah_gui + ':guiAttribute') - gui_attributes: Union[None, Dict[str, str]] = None + gui_attributes: Union[None, dict[str, str]] = None if gui_attributes_list is not None: gui_attributes = {} if not isinstance(gui_attributes_list, list): @@ -348,14 +347,13 @@ def resolve_propref(resref: str): }], "@context": self._context.toJsonObj() } - if self._comment is not None: - if not self._comment.isEmpty(): - tmp['@graph'][0]["rdfs:comment"] = self._comment.toJsonLdObj() - if self._subject is not None: + if self._comment: + tmp['@graph'][0]["rdfs:comment"] = self._comment.toJsonLdObj() + if self._subject: tmp['@graph'][0]["knora-api:subjectType"] = resolve_propref(self._subject) - if self._object is not None: + if self._object: tmp['@graph'][0]["knora-api:objectType"] = resolve_propref(self._object) - if self._gui_element is not None: + if self._gui_element: tmp['@graph'][0]["salsah-gui:guiElement"] = { "@id": self._gui_element } diff --git a/knora/dsplib/models/resource.py b/knora/dsplib/models/resource.py index 4fe58a18e..8248cf146 100644 --- a/knora/dsplib/models/resource.py +++ b/knora/dsplib/models/resource.py @@ -2,7 +2,7 @@ import re from copy import deepcopy from dataclasses import dataclass -from typing import List, Set, Dict, Optional, Any, Union, Type +from typing import Optional, Any, Union, Type from urllib.parse import quote_plus from pystrict import strict @@ -47,7 +47,7 @@ class ResourceInstance(Model): ROUTE: str = "/v2/resources" - baseclasses_with_bitstream: Set[str] = { + baseclasses_with_bitstream: set[str] = { 'StillImageRepresentation', 'AudioRepresentation', 'DocumentRepresentation', @@ -56,7 +56,7 @@ class ResourceInstance(Model): 'DDDRepresentation', 'TextRepresentation' } - knora_properties: Set[str] = { + knora_properties: set[str] = { "knora-api:isPartOf", "knora-api:seqnum", } @@ -67,7 +67,7 @@ class ResourceInstance(Model): _permissions: Optional[Permissions] _user_permission: Optional[PermissionValue] _bitstream: Optional[Bitstream] - _values: Optional[Dict[Value, List[Value]]] + _values: Optional[dict[Value, list[Value]]] def __init__(self, con: Connection, @@ -78,8 +78,8 @@ def __init__(self, permissions: Optional[Permissions] = None, user_permission: Optional[PermissionValue] = None, bitstream: Optional[str] = None, - values: Optional[Dict[ - str, Union[str, List[str], Dict[str, str], List[Dict[str, str]], Value, List[Value]]]] = None): + values: Optional[dict[ + str, Union[str, list[str], dict[str, str], list[dict[str, str]], Value, list[Value]]]] = None): super().__init__(con) self._iri = iri @@ -199,7 +199,7 @@ def fromJsonLdObj(self, con: Connection, jsonld_obj: Any) -> 'ResourceInstance': if id is None: raise BaseError('Resource "id" is missing in JSON-LD from DSP-API') type = jsonld_obj.get('@type') - newinstance._values: Dict[str, Union[Value, List[Value]]] = {} + newinstance._values: dict[str, Union[Value, list[Value]]] = {} for key, obj in jsonld_obj.items(): if key in to_be_ignored: continue @@ -326,9 +326,9 @@ def print(self): class ResourceInstanceFactory: _con: Connection _project: Project - _lists = List[ListNode] - _ontologies = Dict[str, Ontology] - _ontoname2iri = Dict[str, str] + _lists = list[ListNode] + _ontologies = dict[str, Ontology] + _ontoname2iri = dict[str, str] _context: Context def __init__(self, @@ -369,17 +369,17 @@ def __init__(self, self._context.update(self._ontologies[name].context) @property - def lists(self) -> List[ListNode]: + def lists(self) -> list[ListNode]: return self._lists - def get_resclass_names(self) -> List[str]: - resclass_names: List[str] = [] + def get_resclass_names(self) -> list[str]: + resclass_names: list[str] = [] for name, onto in self._ontologies.items(): for resclass in onto.resource_classes: resclass_names.append(onto.context.get_prefixed_iri(resclass.id)) return resclass_names - def _get_baseclass(self, superclasses: List[str]) -> Union[str, None]: + def _get_baseclass(self, superclasses: list[str]) -> Union[str, None]: for sc in superclasses: ontoname, classname = sc.split(':') if ontoname == 'knora-api': @@ -395,7 +395,7 @@ def get_resclass(self, prefixedresclass: str) -> Type: prefix, resclass_name = prefixedresclass.split(':') resclass = [x for x in self._ontologies[prefix].resource_classes if x.name == resclass_name][0] baseclass = self._get_baseclass(resclass.superclasses) - props: Dict[str, Propinfo] = {} + props: dict[str, Propinfo] = {} switcher = { 'knora-api:TextValue': TextValue, 'knora-api:ColorValue': ColorValue, diff --git a/knora/dsplib/models/resourceclass.py b/knora/dsplib/models/resourceclass.py index 5b9a6b085..65d53c358 100644 --- a/knora/dsplib/models/resourceclass.py +++ b/knora/dsplib/models/resourceclass.py @@ -1,7 +1,7 @@ import json import re from enum import Enum -from typing import List, Dict, Tuple, Optional, Any, Union +from typing import Tuple, Optional, Any, Union from urllib.parse import quote_plus from pystrict import strict @@ -22,7 +22,6 @@ @strict class HasProperty(Model): - ROUTE: str = "/v2/ontologies/cardinalities" class Ptype(Enum): @@ -263,7 +262,7 @@ def update(self, last_modification_date: LastModificationDate) -> Tuple[LastModi if self._cardinality is None: raise BaseError("Cardinality id required") jsonobj = self.toJsonObj(last_modification_date, Actions.Update) - jsondata = json.dumps(jsonobj, indent=3, cls=SetEncoder) + jsondata = json.dumps(jsonobj, indent=4, cls=SetEncoder) result = self._con.put(HasProperty.ROUTE, jsondata) last_modification_date = LastModificationDate(result['knora-api:lastModificationDate']) # TODO: self._changed = str() @@ -318,7 +317,7 @@ class ResourceClass(Model): ontology_id: str The IRI/Id of the ontology this resource class belongs to - superlcasses: str, List[str] + superlcasses: str, list[str] This is a list of superclasses for this resource class. Usually a project specific class must at least be a subclass of "Resource", but can be subclassed of any other valid resource class. In addition, external ontologies may be referenced: @@ -340,7 +339,7 @@ class ResourceClass(Model): permission: str The default permissions to be used if an instance of this resource class is being created - has_properties: Dict[str, HasProperty] + has_properties: dict[str, HasProperty] Holds a dict with the property names as keys and a HasProperty instance. HasProperty holds the information, how this resource class uses this property (basically the cardinality) @@ -385,11 +384,11 @@ class ResourceClass(Model): _id: str _name: str _ontology_id: str - _superclasses: List[str] + _superclasses: list[str] _label: LangString _comment: LangString _permissions: str - _has_properties: Dict[str, HasProperty] + _has_properties: dict[str, HasProperty] def __init__(self, con: Connection, @@ -397,11 +396,11 @@ def __init__(self, id: Optional[str] = None, name: Optional[str] = None, ontology_id: Optional[str] = None, - superclasses: Optional[List[Union['ResourceClass', str]]] = None, + superclasses: Optional[list[Union['ResourceClass', str]]] = None, label: Optional[Union[LangString, str]] = None, comment: Optional[Union[LangString, str]] = None, permissions: Optional[str] = None, - has_properties: Optional[Dict[str, HasProperty]] = None): + has_properties: Optional[dict[str, HasProperty]] = None): """ Create an instance of ResourceClass @@ -486,11 +485,11 @@ def ontology_id(self, value: str) -> None: raise BaseError('"ontology_id" cannot be modified!') @property - def superclasses(self) -> Optional[List[str]]: + def superclasses(self) -> Optional[list[str]]: return self._superclasses @superclasses.setter - def superclasses(self, value: List[str]) -> None: + def superclasses(self, value: list[str]) -> None: raise BaseError('"superclasses" cannot be modified!') @property @@ -552,7 +551,7 @@ def permissions(self, value: str) -> None: raise BaseError('"permissions" cannot be modified!') @property - def has_properties(self) -> Dict[str, HasProperty]: + def has_properties(self) -> dict[str, HasProperty]: return self._has_properties @has_properties.setter @@ -570,7 +569,7 @@ def addProperty(self, property_id: str, cardinality: Cardinality, gui_order: Optional[int] = None, - ) -> Optional[LastModificationDate]: + ) -> LastModificationDate: if self._has_properties.get(property_id) is None: latest_modification_date, resclass = HasProperty(con=self._con, context=self._context, @@ -639,10 +638,10 @@ def fromJsonObj(cls, con: Connection, context: Context, json_obj: Any) -> Any: name = tmp_id[1] superclasses_obj = json_obj.get(rdfs + ':subClassOf') if superclasses_obj is not None: - supercls: List[Any] = list(filter(lambda a: a.get('@id') is not None, superclasses_obj)) - superclasses: List[str] = list(map(lambda a: a['@id'], supercls)) - has_props: List[Any] = list(filter(lambda a: a.get('@type') == (owl + ':Restriction'), superclasses_obj)) - has_properties: Dict[HasProperty] = dict(map(lambda a: HasProperty.fromJsonObj(con, context, a), has_props)) + supercls: list[Any] = list(filter(lambda a: a.get('@id') is not None, superclasses_obj)) + superclasses: list[str] = list(map(lambda a: a['@id'], supercls)) + has_props: list[Any] = list(filter(lambda a: a.get('@type') == (owl + ':Restriction'), superclasses_obj)) + has_properties: dict[HasProperty] = dict(map(lambda a: HasProperty.fromJsonObj(con, context, a), has_props)) # # now remove the ...Value stuff from resource pointers: A resource pointer is returned as 2 properties: # one a direct link, the other the pointer to a link value @@ -679,7 +678,8 @@ def resolve_resref(resref: str): self._context.add_context(tmp[0]) return {"@id": resref} # fully qualified name in the form "prefix:name" else: - return {"@id": self._context.prefix_from_iri(self._ontology_id) + ':' + tmp[1]} # ":name" in current ontology + return {"@id": self._context.prefix_from_iri(self._ontology_id) + ':' + tmp[ + 1]} # ":name" in current ontology else: return {"@id": "knora-api:" + resref} # no ":", must be from knora-api! @@ -701,7 +701,7 @@ def resolve_resref(resref: str): else: superclasses = list(map(resolve_resref, self._superclasses)) if self._comment is None or self._comment.isEmpty(): - self._comment = LangString("no comment available") + self._comment = LangString({"en": "[no comment provided]"}) if self._label is None or self._label.isEmpty(): self._label = LangString("no label available") tmp = { @@ -771,7 +771,7 @@ def delete(self, last_modification_date: LastModificationDate) -> LastModificati ResourceClass.ROUTE + '/' + quote_plus(self._id) + '?lastModificationDate=' + str(last_modification_date)) return LastModificationDate(result['knora-api:lastModificationDate']) - def createDefinitionFileObj(self, context: Context, shortname: str, skiplist: List[str]): + def createDefinitionFileObj(self, context: Context, shortname: str, skiplist: list[str]): resource = { "name": self._name, "labels": self._label.createDefinitionFileObj(), diff --git a/knora/dsplib/models/user.py b/knora/dsplib/models/user.py index e77adf969..249d3bb6e 100644 --- a/knora/dsplib/models/user.py +++ b/knora/dsplib/models/user.py @@ -2,7 +2,7 @@ import os import sys import urllib.parse -from typing import List, Set, Dict, Optional, Any, Union +from typing import Optional, Any, Union from urllib.parse import quote_plus from pystrict import strict @@ -81,11 +81,11 @@ class User(Model): sysadmin : bool True, if user is system administrator [read/write] - in_groups : Set[str] + in_groups : set[str] Set of group IRI's the user is member of [readonly]. Use ``addToGroup``and ``rmFromGroup`` to modify group membership - in_projects : Set[str] + in_projects : set[str] Set of project IRI's the user belongs to Use ``addToproject``and ``rmFromproject`` to modify project membership @@ -147,13 +147,13 @@ class User(Model): _lang: Languages _status: bool _sysadmin: bool - _in_groups: Set[str] - _in_projects: Dict[str, bool] - _add_to_project: Dict[str, bool] - _rm_from_project: Dict[str, bool] - _add_to_group: Set[str] - _rm_from_group: Set[str] - _change_admin: Dict[str, bool] + _in_groups: set[str] + _in_projects: dict[str, bool] + _add_to_project: dict[str, bool] + _rm_from_project: dict[str, bool] + _add_to_group: set[str] + _rm_from_group: set[str] + _change_admin: dict[str, bool] def __init__(self, con: Connection, @@ -166,8 +166,8 @@ def __init__(self, lang: Optional[Union[str, Languages]] = None, status: Optional[bool] = None, sysadmin: Optional[bool] = None, - in_projects: Optional[Dict[str, bool]] = None, - in_groups: Optional[Set[str]] = None): + in_projects: Optional[dict[str, bool]] = None, + in_groups: Optional[set[str]] = None): """ Constructor for User @@ -324,7 +324,7 @@ def sysadmin(self, value: bool): self._changed.add('sysadmin') @property - def in_groups(self) -> Set[str]: + def in_groups(self) -> set[str]: return self._in_groups @in_groups.setter @@ -364,7 +364,7 @@ def rmFromGroup(self, value: str): raise BaseError("User is not in groups!") @property - def in_projects(self) -> Dict[str, bool]: + def in_projects(self) -> dict[str, bool]: return self._in_projects @in_projects.setter @@ -437,7 +437,7 @@ def unmakeProjectAdmin(self, value: str): raise BaseError("User is not member of project!") @property - def changed(self) -> Set[str]: + def changed(self) -> set[str]: return self._changed def has_changed(self, name: str): @@ -471,8 +471,8 @@ def fromJsonObj(cls, con: Connection, json_obj: Any): if status is None: raise BaseError("Status is missing in JSON from knora") - in_projects: Dict[str, bool] = {} - in_groups: Set[str] = set() + in_projects: dict[str, bool] = {} + in_groups: set[str] = set() if json_obj.get('permissions') is not None and json_obj['permissions'].get('groupsPerProject') is not None: sysadmin = False project_groups = json_obj['permissions']['groupsPerProject'] @@ -664,7 +664,7 @@ def delete(self): return User.fromJsonObj(self._con, result['user']) @staticmethod - def getAllUsers(con: Connection) -> List[Any]: + def getAllUsers(con: Connection) -> list[Any]: """ Get a list of all users (static method) @@ -678,7 +678,7 @@ def getAllUsers(con: Connection) -> List[Any]: return list(map(lambda a: User.fromJsonObj(con, a), result['users'])) @staticmethod - def getAllUsersForProject(con: Connection, proj_shortcode: str) -> List[Any]: + def getAllUsersForProject(con: Connection, proj_shortcode: str) -> list[Any]: """ Get a list of all users that belong to a project(static method) diff --git a/knora/dsplib/models/value.py b/knora/dsplib/models/value.py index e0eb05725..ecda8d6be 100644 --- a/knora/dsplib/models/value.py +++ b/knora/dsplib/models/value.py @@ -1,9 +1,8 @@ import re -from typing import List, Dict, Optional, Any, Union +from typing import Optional, Any, Union from pystrict import strict -from .group import Group from .helpers import IriTest, Actions, BaseError from .langstring import LangString from .listnode import ListNode @@ -26,7 +25,7 @@ def __str__(self) -> str: def getXml(self) -> str: return self.__xmlstr - def findall(self) -> Union[List[str], None]: + def findall(self) -> Union[list[str], None]: return self.__iriregexp.findall(self.__xmlstr) def replace(self, fromStr: str, toStr: str) -> None: @@ -95,7 +94,7 @@ def upermission(self) -> str: def comment(self): return self._comment - def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: + def toJsonLdObj(self, action: Actions) -> dict[str, Any]: tmp = {} if action == Actions.Create: if self._permissions: @@ -127,7 +126,7 @@ def get_typed_value(key: str, jsonld_obj: Any) -> Union[str, float]: raise BaseError("Error in JSON-LD returned!") @staticmethod - def getFromJsonLd(jsonld_obj) -> Dict[str, Union[str, float]]: + def getFromJsonLd(jsonld_obj) -> dict[str, Union[str, float]]: return { 'iri': jsonld_obj.get("@id"), @@ -171,7 +170,7 @@ def mapping(self) -> str: return self._mapping @classmethod - def fromJsonLdObj(cls, jsonld_obj: Any) -> Dict[str, Any]: + def fromJsonLdObj(cls, jsonld_obj: Any) -> dict[str, Any]: tmp = Value.getFromJsonLd(jsonld_obj) if jsonld_obj.get("knora-api:textValueAsXml") is not None: @@ -182,7 +181,7 @@ def fromJsonLdObj(cls, jsonld_obj: Any) -> Dict[str, Any]: tmp['value'] = jsonld_obj.get("knora-api:valueAsString") return cls(**tmp) - def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: + def toJsonLdObj(self, action: Actions) -> dict[str, Any]: tmp = super().toJsonLdObj(action) if action == Actions.Create: tmp['@type'] = 'knora-api:TextValue' @@ -230,12 +229,12 @@ def value(self) -> str: return self._value @classmethod - def fromJsonLdObj(cls, jsonld_obj: Any) -> Dict[str, Any]: + def fromJsonLdObj(cls, jsonld_obj: Any) -> dict[str, Any]: tmp = Value.getFromJsonLd(jsonld_obj) tmp['value'] = jsonld_obj.get("knora-api:colorValueAsColor") return cls(**tmp) - def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: + def toJsonLdObj(self, action: Actions) -> dict[str, Any]: tmp = super().toJsonLdObj(action) if action == Actions.Create: tmp['@type'] = "knora-api:ColorValue" @@ -318,7 +317,7 @@ def value(self) -> str: return datestr @classmethod - def fromJsonLdObj(cls, jsonld_obj: Any) -> Dict[str, Any]: + def fromJsonLdObj(cls, jsonld_obj: Any) -> dict[str, Any]: tmp = Value.getFromJsonLd(jsonld_obj) datestr = "" @@ -343,7 +342,7 @@ def fromJsonLdObj(cls, jsonld_obj: Any) -> Dict[str, Any]: tmp['value'] = datestr return cls(**tmp) - def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: + def toJsonLdObj(self, action: Actions) -> dict[str, Any]: tmp = super().toJsonLdObj(action) if action == Actions.Create: tmp['@type'] = "knora-api:DateValue" @@ -425,12 +424,12 @@ def value(self) -> float: return self._value @classmethod - def fromJsonLdObj(cls, jsonld_obj: Any) -> Dict[str, Any]: + def fromJsonLdObj(cls, jsonld_obj: Any) -> dict[str, Any]: tmp = Value.getFromJsonLd(jsonld_obj) tmp['value'] = Value.get_typed_value("knora-api:decimalValueAsDecimal", jsonld_obj) return cls(**tmp) - def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: + def toJsonLdObj(self, action: Actions) -> dict[str, Any]: tmp = super().toJsonLdObj(action) if action == Actions.Create: tmp['@type'] = "knora-api:DecimalValue" @@ -469,12 +468,12 @@ def value(self) -> str: return self._value @classmethod - def fromJsonLdObj(cls, jsonld_obj: Any) -> Dict[str, Any]: + def fromJsonLdObj(cls, jsonld_obj: Any) -> dict[str, Any]: tmp = Value.getFromJsonLd(jsonld_obj) tmp['value'] = jsonld_obj.get('knora-api:geometryValueAsGeometry') return cls(**tmp) - def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: + def toJsonLdObj(self, action: Actions) -> dict[str, Any]: tmp = super().toJsonLdObj(action) if action == Actions.Create: tmp['@type'] = "knora-api:GeomValue" @@ -510,12 +509,12 @@ def value(self) -> str: return self._value @classmethod - def fromJsonLdObj(cls, jsonld_obj: Any) -> Dict[str, Any]: + def fromJsonLdObj(cls, jsonld_obj: Any) -> dict[str, Any]: tmp = Value.getFromJsonLd(jsonld_obj) tmp['value'] = jsonld_obj.get('knora-api:geonameValueAsGeonameCode') return cls(**tmp) - def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: + def toJsonLdObj(self, action: Actions) -> dict[str, Any]: tmp = super().toJsonLdObj(action) if action == Actions.Create: tmp['@type'] = "knora-api:GeonameValue" @@ -558,12 +557,12 @@ def value(self) -> int: return self._value @classmethod - def fromJsonLdObj(cls, jsonld_obj: Any) -> Dict[str, Any]: + def fromJsonLdObj(cls, jsonld_obj: Any) -> dict[str, Any]: tmp = Value.getFromJsonLd(jsonld_obj) tmp['value'] = jsonld_obj.get("knora-api:intValueAsInt") return cls(**tmp) - def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: + def toJsonLdObj(self, action: Actions) -> dict[str, Any]: tmp = super().toJsonLdObj(action) if action == Actions.Create: tmp['@type'] = "knora-api:IntValue" @@ -609,12 +608,12 @@ def value(self) -> bool: return self._value @classmethod - def fromJsonLdObj(cls, jsonld_obj: Any) -> Dict[str, Any]: + def fromJsonLdObj(cls, jsonld_obj: Any) -> dict[str, Any]: tmp = Value.getFromJsonLd(jsonld_obj) tmp['value'] = jsonld_obj.get("knora-api:booleanValueAsBoolean") return cls(**tmp) - def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: + def toJsonLdObj(self, action: Actions) -> dict[str, Any]: tmp = super().toJsonLdObj(action) if action == Actions.Create: tmp['@type'] = "knora-api:BooleanValue" @@ -654,12 +653,12 @@ def value(self) -> str: return self._value @classmethod - def fromJsonLdObj(cls, jsonld_obj: Any) -> Dict[str, Any]: + def fromJsonLdObj(cls, jsonld_obj: Any) -> dict[str, Any]: tmp = Value.getFromJsonLd(jsonld_obj) tmp['value'] = Value.get_typed_value("knora-api:uriValueAsUri", jsonld_obj) return cls(**tmp) - def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: + def toJsonLdObj(self, action: Actions) -> dict[str, Any]: tmp = super().toJsonLdObj(action) if action == Actions.Create: tmp['@type'] = "knora-api:UriValue" @@ -707,12 +706,12 @@ def value(self) -> str: return self._value @classmethod - def fromJsonLdObj(cls, jsonld_obj: Any) -> Dict[str, Any]: + def fromJsonLdObj(cls, jsonld_obj: Any) -> dict[str, Any]: tmp = Value.getFromJsonLd(jsonld_obj) tmp['value'] = Value.get_typed_value("knora-api:timeValueAsTimeStamp", jsonld_obj) return cls(**tmp) - def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: + def toJsonLdObj(self, action: Actions) -> dict[str, Any]: tmp = super().toJsonLdObj(action) if action == Actions.Create: tmp['@type'] = "knora-api:TimeValue" @@ -770,14 +769,14 @@ def iv_end(self) -> float: return self._iv_end @classmethod - def fromJsonLdObj(cls, jsonld_obj: Any) -> Dict[str, Any]: + def fromJsonLdObj(cls, jsonld_obj: Any) -> dict[str, Any]: tmp = Value.getFromJsonLd(jsonld_obj) start = Value.get_typed_value("knora-api:intervalValueHasStart", jsonld_obj) end = Value.get_typed_value("knora-api:intervalValueHasEnd", jsonld_obj) tmp['value'] = str(start) + ":" + str(end) return cls(**tmp) - def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: + def toJsonLdObj(self, action: Actions) -> dict[str, Any]: tmp = super().toJsonLdObj(action) if action == Actions.Create: tmp['@type'] = "knora-api:IntervalValue" @@ -802,7 +801,7 @@ class ListValue(Value): def __init__(self, value: str, - lists: List[ListNode] = None, + lists: list[ListNode] = None, comment: Optional[LangString] = None, permissions: Optional[Permissions] = None, upermission: Optional[PermissionValue] = None, @@ -810,7 +809,7 @@ def __init__(self, ark_url: Optional[str] = None, vark_url: Optional[str] = None): - def find_listnode(nodes: List[ListNode], name: str) -> Optional[str]: + def find_listnode(nodes: list[ListNode], name: str) -> Optional[str]: for node in nodes: if node.name == name: return node.id @@ -859,12 +858,12 @@ def value(self) -> str: return self._value @classmethod - def fromJsonLdObj(cls, jsonld_obj: Any) -> Dict[str, Any]: + def fromJsonLdObj(cls, jsonld_obj: Any) -> dict[str, Any]: tmp = Value.getFromJsonLd(jsonld_obj) tmp['value'] = Value.get_typed_value("knora-api:listValueAsListNode", jsonld_obj) return cls(**tmp) - def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: + def toJsonLdObj(self, action: Actions) -> dict[str, Any]: tmp = super().toJsonLdObj(action) if action == Actions.Create: tmp['@type'] = "knora-api:ListValue" @@ -920,7 +919,7 @@ def reslabel(self) -> str: return self._reslabel @classmethod - def fromJsonLdObj(cls, jsonld_obj: Any) -> Dict[str, Any]: + def fromJsonLdObj(cls, jsonld_obj: Any) -> dict[str, Any]: tmp = Value.getFromJsonLd(jsonld_obj) linked_resource = jsonld_obj.get("knora-api:linkValueHasTarget") if linked_resource is not None: @@ -929,7 +928,7 @@ def fromJsonLdObj(cls, jsonld_obj: Any) -> Dict[str, Any]: tmp['reslabel'] = linked_resource["rdfs:label"] return cls(**tmp) - def toJsonLdObj(self, action: Actions) -> Dict[str, Any]: + def toJsonLdObj(self, action: Actions) -> dict[str, Any]: tmp = super().toJsonLdObj(action) if action == Actions.Create: tmp['@type'] = "knora-api:LinkValue" diff --git a/knora/dsplib/utils/excel_to_json_lists.py b/knora/dsplib/utils/excel_to_json_lists.py index 2eb61d71b..3f961b490 100644 --- a/knora/dsplib/utils/excel_to_json_lists.py +++ b/knora/dsplib/utils/excel_to_json_lists.py @@ -5,7 +5,6 @@ import os import re import unicodedata -from typing import List, Dict import jsonschema from jsonschema import validate @@ -15,8 +14,8 @@ cell_names = [] -def get_values_from_excel(excelfiles: List[str], base_file: str, parentnode: {}, row: int, col: int, - preval: List[str]) -> int: +def get_values_from_excel(excelfiles: list[str], base_file: str, parentnode: {}, row: int, col: int, + preval: list[str]) -> int: """ This function calls itself recursively to go through the Excel files. It extracts the cell values and creates the JSON list file. @@ -109,7 +108,7 @@ def get_values_from_excel(excelfiles: List[str], base_file: str, parentnode: {}, return row - 1 -def make_json_list_from_excel(rootnode: {}, excelfiles: List[str]) -> None: +def make_json_list_from_excel(rootnode: {}, excelfiles: list[str]) -> None: """ Reads Excel files and makes a JSON list file from them. The JSON can then be used in an ontology that is uploaded to the DaSCH Service Platform. @@ -206,7 +205,7 @@ def check_language_code(lang_code: str) -> bool: return False -def make_root_node_from_args(excelfiles: List[str], listname_from_args: str, comments: Dict[str, str]) -> dict: +def make_root_node_from_args(excelfiles: list[str], listname_from_args: str, comments: dict[str, str]) -> dict: """ Creates the root node for the JSON list @@ -286,7 +285,7 @@ def prepare_list_creation(excelfolder: str, listname: str, comments: dict): Returns: rootnode (dict): The rootnode of the list as a dictionary - excel_files (List[str]): list of the Excel files to process + excel_files (list[str]): list of the Excel files to process """ # reset the global variables before list creation starts global cell_names diff --git a/knora/dsplib/utils/expand_all_lists.py b/knora/dsplib/utils/expand_all_lists.py index a998de2eb..3e9009e1b 100644 --- a/knora/dsplib/utils/expand_all_lists.py +++ b/knora/dsplib/utils/expand_all_lists.py @@ -1,32 +1,32 @@ -from typing import Dict, List +from typing import Any from knora.dsplib.utils.excel_to_json_lists import make_json_list_from_excel, prepare_list_creation -def expand_lists_from_excel(data_model: Dict) -> List[str]: +def expand_lists_from_excel(data_model: dict[str, Any]) -> list[str]: """ - Gets all lists from an ontology and expands them to json if they are only referenced via an Excel file + Gets all list definitions from a data model and expands them to JSON if they are only referenced via an Excel file Args: - data_model: The data model (json) the lists are read from + data_model: The data model (JSON) the lists are read from Returns: - A list of all expanded lists. It can be added to the root node of an ontology as list section. + A list of all expanded lists. It can be added to the root node of an ontology as lists section. """ # create and add lists from Excel references to the ontology - lists = data_model['project'].get('lists') + lists = data_model["project"].get("lists") if not lists: return [] new_lists = [] for rootnode in lists: - nodes = rootnode['nodes'] + nodes = rootnode["nodes"] # check if the folder parameter is used - if rootnode.get('nodes') and isinstance(nodes, dict) and nodes.get('folder'): + if rootnode.get("nodes") and isinstance(nodes, dict) and nodes.get("folder"): # get the Excel files from the folder and create the rootnode of the list - excel_folder = nodes['folder'] - rootnode, excel_files = prepare_list_creation(excel_folder, rootnode.get('name'), rootnode.get('comments')) + excel_folder = nodes["folder"] + rootnode, excel_files = prepare_list_creation(excel_folder, rootnode.get("name"), rootnode.get("comments")) # create the list from the Excel files make_json_list_from_excel(rootnode, excel_files) diff --git a/knora/dsplib/utils/onto_create_lists.py b/knora/dsplib/utils/onto_create_lists.py index e0cc86cc3..355aead51 100644 --- a/knora/dsplib/utils/onto_create_lists.py +++ b/knora/dsplib/utils/onto_create_lists.py @@ -1,21 +1,23 @@ import json -from typing import Any, Dict, List +from typing import Any, Optional from .expand_all_lists import expand_lists_from_excel from .onto_validate import validate_ontology from ..models.connection import Connection +from ..models.helpers import BaseError from ..models.listnode import ListNode from ..models.project import Project -def list_creator(con: Connection, project: Project, parent_node: ListNode, nodes: List[Dict[Any, Any]]) -> List[Dict[Any, Any]]: +def create_list_node(con: Connection, project: Project, parent_node: ListNode, nodes: list[dict[str, Any]]) -> list[ + dict[str, Any]]: """ Creates the list on the DSP server Args: - con: The connection to the DSP server - project: The project which the lists should be added - parent_node: The root node of the list + con: Connection to the DSP server + project: Project which the lists should be added to + parent_node: Root node of the list nodes: List of nodes the list is made of Returns: @@ -23,21 +25,28 @@ def list_creator(con: Connection, project: Project, parent_node: ListNode, nodes """ nodelist = [] for node in nodes: - new_node = ListNode(con=con, project=project, label=node["labels"], comments=node.get("comments"), - name=node["name"], - parent=parent_node).create() - if node.get('nodes') is not None: - subnode_list = list_creator(con, project, new_node, node['nodes']) - nodelist.append({new_node.name: {"id": new_node.id, 'nodes': subnode_list}}) - else: - nodelist.append({new_node.name: {"id": new_node.id}}) + new_node = None + try: + new_node = ListNode(con=con, project=project, label=node["labels"], comments=node.get("comments"), + name=node["name"], + parent=parent_node).create() + except BaseError as err: + print(f"ERROR while trying to create list node '{node['name']}'. The error message was {err.message}") + exit(1) + if new_node: + # if node has child nodes, call the method recursively + if node.get("nodes"): + subnode_list = create_list_node(con, project, new_node, node["nodes"]) + nodelist.append({new_node.name: {"id": new_node.id, "nodes": subnode_list}}) + else: + nodelist.append({new_node.name: {"id": new_node.id}}) return nodelist def create_lists(input_file: str, lists_file: str, server: str, user: str, password: str, verbose: bool, - dump: bool = False) -> Dict[str, Any]: + dump: bool = False) -> dict[str, Any]: """ - Creates the lists on the DSP server + Handles the list creation Args: input_file: Path to the json data model file @@ -61,7 +70,7 @@ def create_lists(input_file: str, lists_file: str, server: str, user: str, passw new_lists = expand_lists_from_excel(data_model) # add the newly created lists from Excel to the ontology - data_model['project']['lists'] = new_lists + data_model["project"]["lists"] = new_lists # validate the ontology if validate_ontology(data_model): @@ -77,28 +86,56 @@ def create_lists(input_file: str, lists_file: str, server: str, user: str, passw con.start_logging() # get the project which must exist - project = Project(con=con, shortcode=data_model['project']['shortcode'], ).read() - assert project is not None + project: Optional[Project] = None + try: + project = Project(con=con, shortcode=data_model["project"]["shortcode"]).read() + except BaseError as err: + print( + f"ERROR while trying to create the lists. Referenced project couldn't be read from the server. The error message was: {err.message}") + exit(1) # create the lists if verbose: - print('Create lists...') + print("Create lists...") - lists = data_model['project'].get('lists') + all_lists: Optional[list[ListNode]] = ListNode.getAllLists(con, project.id) + lists = data_model["project"].get("lists") list_root_nodes = {} - if lists is not None: + if lists: for rootnode in lists: + rootnode_name = rootnode["name"] + # check if list already exists + list_exists: bool = False + if all_lists: + for list_item in all_lists: + if list_item.project == project.id and list_item.name == rootnode_name: + list_root_nodes[list_item.name] = {"id": list_item.id, "nodes": rootnode["nodes"]} + list_exists = True + if list_exists: + print(f"WARN List '{rootnode_name}' already exists. Skipping...") + continue + if verbose: - print(' Create list:' + rootnode['name']) - root_list_node = ListNode(con=con, project=project, label=rootnode['labels'], - comments=rootnode.get('comments'), - name=rootnode['name']).create() - if rootnode.get('nodes') is not None: - list_nodes = list_creator(con, project, root_list_node, rootnode['nodes']) - list_root_nodes[rootnode['name']] = {'id': root_list_node.id, 'nodes': list_nodes} - - with open(lists_file, 'w', encoding='utf-8') as fp: - json.dump(list_root_nodes, fp, indent=3, sort_keys=True) - print(f'The IRI for the created nodes can be found in {lists_file}.') + print(f"Creating list '{rootnode_name}'.") + + root_list_node = None + try: + root_list_node = ListNode(con=con, project=project, label=rootnode["labels"], + comments=rootnode.get("comments"), + name=rootnode_name).create() + except BaseError as err: + print(f"ERROR while trying to create the list '{rootnode_name}'. The error message was: {err.message}") + exit(1) + except Exception as exception: + print( + f"ERROR while trying to create the list '{rootnode_name}'. The error message was: {exception}") + exit(1) + if rootnode.get("nodes") and root_list_node and project: + list_nodes = create_list_node(con, project, root_list_node, rootnode["nodes"]) + list_root_nodes[rootnode["name"]] = {"id": root_list_node.id, "nodes": list_nodes} + + with open(lists_file, "w", encoding="utf-8") as fp: + json.dump(list_root_nodes, fp, indent=4, sort_keys=True) + print(f"The IRI for the created nodes can be found in '{lists_file}'.") return list_root_nodes diff --git a/knora/dsplib/utils/onto_create_ontology.py b/knora/dsplib/utils/onto_create_ontology.py index 301721ee9..786c9bfc0 100644 --- a/knora/dsplib/utils/onto_create_ontology.py +++ b/knora/dsplib/utils/onto_create_ontology.py @@ -1,20 +1,20 @@ -"""This module handles the ontology creation and upload to a DSP server. This includes the creation of the project, -groups, users, lists, resource classes, properties and cardinalities. """ +"""This module handles the ontology creation, update and upload to a DSP server. This includes the creation and update +of the project, the creation of groups, users, lists, resource classes, properties and cardinalities.""" import json -from typing import Dict, List, Optional, Set - -from .expand_all_lists import expand_lists_from_excel -from .onto_create_lists import create_lists -from .onto_validate import validate_ontology -from ..models.connection import Connection -from ..models.group import Group -from ..models.helpers import BaseError, Cardinality, Context -from ..models.langstring import LangString -from ..models.ontology import Ontology -from ..models.project import Project -from ..models.propertyclass import PropertyClass -from ..models.resourceclass import ResourceClass -from ..models.user import User +from typing import Union, Optional, Any + +from knora.dsplib.models.connection import Connection +from knora.dsplib.models.group import Group +from knora.dsplib.models.helpers import BaseError, Cardinality, Context +from knora.dsplib.models.langstring import LangString +from knora.dsplib.models.ontology import Ontology +from knora.dsplib.models.project import Project +from knora.dsplib.models.propertyclass import PropertyClass +from knora.dsplib.models.resourceclass import ResourceClass +from knora.dsplib.models.user import User +from knora.dsplib.utils.expand_all_lists import expand_lists_from_excel +from knora.dsplib.utils.onto_create_lists import create_lists +from knora.dsplib.utils.onto_validate import validate_ontology def login(server: str, user: str, password: str) -> Connection: @@ -34,27 +34,291 @@ def login(server: str, user: str, password: str) -> Connection: return con +def create_project(con: Connection, data_model: dict[str, Any], verbose: bool) -> Project: + """ + Creates a project on a DSP server with information provided in the data_model + + Args: + con: connection instance to connect to the DSP server + data_model: The data model as JSON + verbose: Prints out more information if set to True + + Returns: + created project + """ + project_shortcode = data_model["project"]["shortcode"] + project_shortname = data_model["project"]["shortname"] + + try: + project = Project(con=con, + shortcode=data_model["project"]["shortcode"], + shortname=data_model["project"]["shortname"], + longname=data_model["project"]["longname"], + description=LangString(data_model["project"].get("descriptions")), + keywords=set(data_model["project"].get("keywords")), + selfjoin=False, + status=True).create() + if verbose: + print(f"Created project '{project_shortname}' ({project_shortcode}).") + return project + except BaseError as err: + print( + f"ERROR while trying to create project '{project_shortname}' ({project_shortcode}). The error message was: {err.message}") + exit(1) + except Exception as exception: + print( + f"ERROR while trying to create project '{project_shortname}' ({project_shortcode}). The error message was: {exception}") + exit(1) + + +def update_project(project: Project, data_model: dict[str, Any], verbose: bool) -> Project: + """ + Updates a project on a DSP server with information provided in the data_model + + Args: + project: The project to be updated + data_model: The data model as JSON + verbose: Prints out more information if set to True + + Returns: + updated project + """ + project_shortcode = data_model["project"]["shortcode"] + project_shortname = data_model["project"]["shortname"] + project.longname = data_model["project"]["longname"] + project.description = data_model["project"].get("descriptions") + project.keywords = data_model["project"].get("keywords") + try: + updated_project = project.update() + if verbose: + print(f"Updated project '{project_shortname}' ({project_shortcode}).") + return updated_project + except BaseError as err: + print( + f"ERROR while trying to update project '{project_shortname}' ({project_shortcode}). The error message was: {err.message}") + exit(1) + except Exception as exception: + print( + f"ERROR while trying to update project '{project_shortname}' ({project_shortcode}). The error message was: {exception}") + exit(1) + + +def create_groups(con: Connection, groups: list[dict[str, str]], project: Project, verbose: bool) -> dict[str, Group]: + """ + Creates group(s) on a DSP server from a list of group definitions + + Args: + con: connection instance to connect to the DSP server + groups: List of definitions of the groups (JSON) to be created + project: Project the group(s) should be added to + verbose: Prints out more information if set to True + + Returns: + Dict with group names and groups + """ + new_groups: dict[str, Group] = {} + for group in groups: + group_name = group["name"] + + # check if the group already exists, skip if so + all_groups: Optional[list[Group]] = Group.getAllGroups(con) + group_exists: bool = False + if all_groups: + for group_item in all_groups: + if group_item.project == project.id and group_item.name == group_name: + group_exists = True + if group_exists: + print(f"WARN Group '{group_name}' already exists. Skipping...") + continue + + # check if status is defined, set default value if not + group_status: Optional[str] = group.get("status") + group_status_bool = True + if isinstance(group_status, str): + group_status_bool = json.loads(group_status.lower()) # lower() converts string to boolean + + # check if selfjoin is defined, set default value if not + group_selfjoin: Optional[str] = group.get("selfjoin") + group_selfjoin_bool = False + if isinstance(group_selfjoin, str): + group_selfjoin_bool = json.loads(group_selfjoin.lower()) # lower() converts string to boolean + + # create the group + try: + new_group: Group = Group(con=con, + name=group_name, + descriptions=LangString(group["descriptions"]), + project=project, + status=group_status_bool, + selfjoin=group_selfjoin_bool).create() + if verbose: + print(f"Created group '{group_name}'.") + if new_group.name: + new_groups[new_group.name] = new_group + + except BaseError as err: + print(f"ERROR while trying to create group '{group_name}'. The error message was: {err.message}") + exit(1) + except Exception as exception: + print(f"ERROR while trying to create group '{group_name}'. The error message was: {exception}") + exit(1) + return new_groups + + +def create_users(con: Connection, users: list[dict[str, str]], groups: dict[str, Group], project: Project, + verbose: bool) -> None: + """ + Creates user(s) on a DSP server from a list of user definitions + + Args: + con: connection instance to connect to the DSP server + users: List of definitions of the users (JSON) to be created + groups: Dict with group definitions defined inside the actual ontology + project: Project the user(s) should be added to + verbose: Prints more information if set to True + + Returns: + None + """ + for user in users: + username = user["username"] + + # check if the user already exists, skip if so + maybe_user: Optional[User] = None + try: + maybe_user = User(con, email=user["email"]).read() + except BaseError: + pass + if maybe_user: + print(f"WARN User '{username}' already exists. Skipping...") + continue + + sysadmin = False + group_ids: set[str] = set() + project_info: dict[str, bool] = {} + + # if "groups" is provided add user to the group(s) + user_groups = user.get("groups") + if user_groups: + all_groups: Optional[list[Group]] = Group.getAllGroups(con) + for full_group_name in user_groups: + if verbose: + print(f"Add user '{username}' to group '{full_group_name}'.") + # full_group_name has the form '[project_shortname]:group_name' or 'SystemAdmin' + # if project_shortname is omitted, the group belongs to the current project + tmp_group_name: Union[list[str], str] = full_group_name.split( + ":") if ":" in full_group_name else full_group_name + + if len(tmp_group_name) == 2: + project_shortname = tmp_group_name[0] + group_name = tmp_group_name[1] + + group: Optional[Group] = None + if project_shortname: # full_group_name refers to an already existing group on DSP + # check that group exists + if all_groups: + for g in all_groups: + if g.project == project.id and g.name == group_name: + group = g + else: + print(f"WARN '{group_name}' is referring to a group on DSP but no groups found.") + + else: # full_group_name refers to a group inside the same ontology + group = groups.get(group_name) + if group is None: + print(f"WARN Group '{group_name}' not found in actual ontology.") + else: + if isinstance(group.id, str): + group_ids.add(group.id) + elif tmp_group_name == "SystemAdmin": + sysadmin = True + else: + print(f"WARN Provided group '{full_group_name}' for user '{username}' is not valid. Skipping...") + + # if "projects" is provided, add user to the projects(s) + user_projects = user.get("projects") + if user_projects: + all_projects: list[Project] = project.getAllProjects(con) + for full_project_name in user_projects: + if verbose: + print(f"Add user '{username}' to project '{full_project_name}'.") + # full_project_name has the form '[project_name]:member' or '[project_name]:admin' + # if project_name is omitted, the user is added to the current project + tmp_group_name = full_project_name.split(":") + + if not len(tmp_group_name) == 2: + print( + f"WARN Provided project '{full_project_name}' for user '{username}' is not valid. Skipping...") + continue + + project_name = tmp_group_name[0] + project_role = tmp_group_name[1] + + in_project: Optional[Project] = None + + if project_name: # project_name is provided + # check that project exists + for p in all_projects: + if p.shortname == project_name: + in_project = p + + else: # no project_name provided + in_project = project + + if in_project and isinstance(in_project.id, str): + if project_role == "admin": + project_info[in_project.id] = True + else: + project_info[in_project.id] = False + + # create the user + user_status: Optional[str] = user.get("status") + user_status_bool = True + if isinstance(user_status, str): + user_status_bool = json.loads(user_status.lower()) # lower() converts string to boolean + try: + User(con=con, + username=user["username"], + email=user["email"], + givenName=user["givenName"], + familyName=user["familyName"], + password=user["password"], + status=user_status_bool, + lang=user["lang"] if user.get("lang") else "en", + sysadmin=sysadmin, + in_projects=project_info, + in_groups=group_ids).create() + if verbose: + print(f"Created user {username}.") + except BaseError as err: + print(f"ERROR while trying to create user '{username}'. The error message was: {err.message}") + exit(1) + except Exception as exception: + print(f"ERROR while trying to create user '{username}'. The error message was: {exception}") + exit(1) + + def create_ontology(input_file: str, - lists_file: Optional[str], + lists_file: str, server: str, - user: str, + user_mail: str, password: str, verbose: bool, dump: bool) -> None: """ - Creates the ontology from a json input file on a DSP server + Creates the ontology and all its parts from a JSON input file on a DSP server Args: - input_file: The input json file from which the ontology should be created - lists_file: The file which the output (list node ID) is written to + input_file: The input JSON file from which the ontology and its parts should be created + lists_file: The file which the list output (list node ID) is written to server: The DSP server which the ontology should be created on - user: The user which the ontology should be created with - password: The password for the user - verbose: Prints some more information - dump: Dumps test files (json) for DSP API requests if True + user_mail: The user (e-mail) which the ontology should be created with (requesting user) + password: The password for the user (requesting user) + verbose: Prints more information if set to True + dump: Dumps test files (JSON) for DSP API requests if set to True Returns: - True if successful + None """ knora_api_prefix = "knora-api:" @@ -65,22 +329,15 @@ def create_ontology(input_file: str, data_model = json.loads(onto_json_str) - # expand all lists referenced in the list section of the data model - new_lists = expand_lists_from_excel(data_model) - - # add the newly created lists from Excel to the ontology - data_model['project']['lists'] = new_lists + # expand all lists referenced in the list section of the data model and add them to the ontology + data_model["project"]["lists"] = expand_lists_from_excel(data_model) # validate the ontology - if validate_ontology(data_model): - pass - else: + if not validate_ontology(data_model): exit(1) # make the connection to the server - con = login(server=server, - user=user, - password=password) + con = login(server=server, user=user_mail, password=password) if dump: con.start_logging() @@ -88,256 +345,78 @@ def create_ontology(input_file: str, # read the prefixes of external ontologies that may be used context = Context(data_model.get("prefixes") or {}) - # create or update the project + # check if the project exists project = None try: - # try to read the project to check if it exists project = Project(con=con, shortcode=data_model["project"]["shortcode"]).read() + except BaseError: + pass - # update the project with data from the ontology (data_model) - if project.shortname != data_model["project"]["shortname"]: - project.shortname = data_model["project"]["shortname"] - if project.longname != data_model["project"]["longname"]: - project.longname = data_model["project"]["longname"] - project.description = data_model["project"].get("descriptions") - project.keywords = set(data_model["project"].get("keywords")) - updated_project = project.update() - if updated_project is not None: - project = updated_project - if verbose: - print("Modified project:") - project.print() - except: - # create the project if it does not exist - try: - project = Project(con=con, - shortcode=data_model["project"]["shortcode"], - shortname=data_model["project"]["shortname"], - longname=data_model["project"]["longname"], - description=LangString(data_model["project"].get("descriptions")), - keywords=set(data_model["project"].get("keywords")), - selfjoin=False, - status=True).create() - except BaseError as err: - print("Creating project failed: ", err.message) - exit(1) + # if project exists, update it + if project: + print(f"Project '{data_model['project']['shortcode']}' already exists. Updating it...") + updated_project: Project = update_project(project=project, data_model=data_model, verbose=verbose) if verbose: - print("Created project:") - project.print() + updated_project.print() - # create the lists - list_root_nodes = create_lists(input_file, lists_file, server, user, password, verbose) + # if project does not exist, create it + else: + if verbose: + print("Create project...") + project = create_project(con=con, data_model=data_model, verbose=verbose) - # create the group(s) - if verbose: - print("Create groups...") + # create the list(s), skip if it already exists + list_root_nodes = {} + if data_model["project"].get("lists"): + if verbose: + print("Create lists...") + list_root_nodes = create_lists(input_file, lists_file, server, user_mail, password, verbose) + # create the group(s), skip if it already exists new_groups = {} - groups = data_model["project"].get('groups') - if groups: - for group in groups: - try: - new_group = Group(con=con, - name=group["name"], - descriptions=LangString(group["descriptions"]), - project=project, - status=group["status"] if group.get("status") else True, - selfjoin=group["selfjoin"] if group.get("selfjoin") else False).create() - new_groups[new_group.name] = new_group - if verbose: - print("Created group:") - new_group.print() - - except BaseError as err: - print(f"ERROR while trying to create group '{group.name}'. The error message was: {err.message}") - except Exception as exception: - print(f"ERROR while trying to create group '{group.name}'. The error message was: {exception}") - - # create the user(s) - if verbose: - print("Create users...") - all_groups: List[Group] = [] - all_projects: List[Project] = [] - users = data_model["project"].get('users') - if users is not None: - for user in users: - - sysadmin = False - group_ids: Set[str] = set() - - user_groups = user.get("groups") - # if "groups" is provided, add user to the group(s) - if user_groups: - for full_group_name in user_groups: - if verbose: - print(f"Add user to group: {full_group_name}") - # full_group_name has the form [project_shortname]:group_name or SystemAdmin - # if project_shortname is omitted, the group belongs to the current project - tmp = full_group_name.split(':') - - if len(tmp) == 2: - project_shortname = tmp[0] - group_name = tmp[1] - - group = None - if project_shortname: # full_group_name refers to an already existing group on DSP - # get all groups vom DSP - if not all_groups: - all_groups = Group.getAllGroups(con) - - # check that group exists - for g in all_groups: - if g.project.shortname == project_shortname and g.name == group_name: - group = g - - else: # full_group_name refers to a group inside the same ontology - group = new_groups.get(group_name) - assert group is not None - group_ids.add(group.id) - elif len(tmp) == 1: - if tmp[0] == "SystemAdmin": - sysadmin = True - else: - print(f"ERROR Provided group name {full_group_name} for user {user.username} is not valid.") - - project_infos: Dict[str, bool] = {} - - user_projects = user.get("projects") - # if "groups" is provided, add user to the group(s) - if user_projects: - for full_project_name in user_projects: - if verbose: - print(f"Add user to project: {full_project_name}") - # full_project_name has the form [project_name]:member or [project_name]:admin - # if project_name is omitted, the user is added to the current project - tmp = full_project_name.split(':') - - if not len(tmp) == 2: - print(f"ERROR Provided project name {full_project_name} for user {user.username} is not valid.") - continue - - project_name = tmp[0] - project_role = tmp[1] - - in_project = None - - if project_name: # project_name is provided - # get all projects vom DSP - if not all_projects: - all_projects = project.getAllProjects(con) - - # check that project exists - for p in all_projects: - if p.shortname == project_name: - in_project = p - - else: # no project_name provided - in_project = project - - if project_role == "admin": - project_infos[in_project.id] = True - else: - project_infos[in_project.id] = False + if data_model["project"].get("groups"): + if verbose: + print("Create groups...") + new_groups = create_groups(con=con, groups=data_model["project"]["groups"], project=project, verbose=verbose) - user_existing = False - tmp_user = None - try: - tmp_user = User(con, username=user["username"]).read() - except BaseError as err: - pass - if tmp_user is None: - try: - tmp_user = User(con, email=user["email"]).read() - except BaseError as err: - pass - if tmp_user: - # if the user exists already, update his settings - if tmp_user.username != user["username"]: - tmp_user.username = user["username"] - if tmp_user.email != user["email"]: - tmp_user.email = user["email"] - if tmp_user.givenName != user["givenName"]: - tmp_user.givenName = user["givenName"] - if tmp_user.familyName != user["familyName"]: - tmp_user.familyName = user["familyName"] - if tmp_user.password != user["password"]: - tmp_user.password = user["password"] - if user.get("status") and tmp_user.status != user["status"]: - tmp_user.status = user["status"] - if user.get("lang") and tmp_user.lang != user["lang"]: - tmp_user.lang = user["lang"] - if tmp_user.sysadmin != sysadmin: - tmp_user.sysadmin = sysadmin - try: - tmp_user.update() - except BaseError as err: - tmp_user.print() - print("Updating user failed:", err.message) - - # update group and project membership - # Note: memberships are NOT removed here, just added - tmp_in_groups = tmp_user.in_groups - add_groups = group_ids - tmp_in_groups - for g in add_groups: - User.addToGroup(g) - rm_groups = tmp_in_groups - group_ids - # we do no remove a user from a group here! - tmp_in_projects = tmp_user.in_projects - for p in project_infos.items(): - if tmp_in_projects.get(p[0]) and tmp_in_projects[p[0]] == p[1]: - continue - User.addToProject(p[0], p[1]) - else: - # if the user does not exist yet, create him - try: - new_user = User(con=con, - username=user["username"], - email=user["email"], - givenName=user["givenName"], - familyName=user["familyName"], - password=user["password"], - status=user["status"] if user.get("status") is not None else True, - lang=user["lang"] if user.get("lang") is not None else "en", - sysadmin=sysadmin, - in_projects=project_infos, - in_groups=group_ids).create() - except BaseError as err: - print("Creating user failed:", err.message) - if verbose: - print("New user:") - new_user.print() + # create or update the user(s), skip if it already exists + if data_model["project"].get("users"): + if verbose: + print("Create users...") + create_users(con=con, users=data_model["project"]["users"], groups=new_groups, project=project, + verbose=verbose) # create the ontologies if verbose: print("Create ontologies...") - ontologies = data_model.get("project").get("ontologies") - for ontology in ontologies: + for ontology in data_model.get("project").get("ontologies"): new_ontology = None last_modification_date = None + ontology_name = ontology["name"] try: new_ontology = Ontology(con=con, project=project, - label=ontology.get("label"), - name=ontology.get("name")).create() + label=ontology["label"], + name=ontology_name).create() last_modification_date = new_ontology.lastModificationDate if verbose: - print("Created ontology:") - new_ontology.print() + print(f"Created ontology '{ontology_name}'.") except BaseError as err: - print(f"ERROR while trying to create ontology. The error message was {err.message}") + print( + f"ERROR while trying to create ontology '{ontology_name}'. The error message was {err.message}") exit(1) except Exception as exception: - print(f"ERROR while trying to create ontology. The error message was {exception}") + print(f"ERROR while trying to create ontology '{ontology_name}'. The error message was {exception}") exit(1) # add the prefixes defined in the json file - for prefix, iri in context: - if prefix not in new_ontology.context: - s = iri.iri + ("#" if iri.hashtag else "") + for prefix, ontology_info in context: + if prefix not in new_ontology.context and ontology_info: + s = ontology_info.iri + ("#" if ontology_info.hashtag else "") new_ontology.context.add_context(prefix, s) # create the empty resource classes - new_res_classes: Dict[str, ResourceClass] = {} + new_res_classes: dict[str, ResourceClass] = {} for res_class in ontology.get("resources"): res_name = res_class.get("name") super_classes = res_class.get("super") @@ -347,13 +426,15 @@ def create_ontology(input_file: str, res_comment = res_class.get("comments") if res_comment: res_comment = LangString(res_comment) + else: + res_comment = LangString({"en": "[no comment provided]"}) # if no cardinalities are submitted, don't create the class if not res_class.get("cardinalities"): print(f"ERROR while trying to add cardinalities to class '{res_name}'. No cardinalities submitted. At" f"least one direct cardinality is required to create a class with dsp-tools.") continue - new_res_class = None + new_res_class: Optional[ResourceClass] = None try: last_modification_date, new_res_class = ResourceClass(con=con, context=new_ontology.context, @@ -369,12 +450,15 @@ def create_ontology(input_file: str, except Exception as exception: print( f"ERROR while trying to create resource class {res_name}. The error message was {exception}") - new_res_classes[new_res_class.id] = new_res_class - new_ontology.lastModificationDate = last_modification_date - if verbose: - print("Created resource class:") - new_res_class.print() + if new_res_class: + if isinstance(new_res_class.id, str): + new_res_classes[new_res_class.id] = new_res_class + new_ontology.lastModificationDate = last_modification_date + + if verbose: + print("Created resource class:") + new_res_class.print() # create the property classes for prop_class in ontology.get("properties"): @@ -403,12 +487,12 @@ def create_ontology(input_file: str, # - "object_name" : The object is defined in "knora-api" if prop_class.get("object"): - tmp = prop_class.get("object").split(':') - if len(tmp) > 1: - if tmp[0]: + tmp_group_name = prop_class.get("object").split(':') + if len(tmp_group_name) > 1: + if tmp_group_name[0]: prop_object = prop_class.get("object") # fully qualified name else: - prop_object = new_ontology.name + ':' + tmp[1] # object refers to actual ontology + prop_object = new_ontology.name + ':' + tmp_group_name[1] # object refers to actual ontology else: prop_object = knora_api_prefix + prop_class.get("object") # object refers to knora-api else: @@ -418,11 +502,11 @@ def create_ontology(input_file: str, gui_attributes = prop_class.get("gui_attributes") if gui_attributes and gui_attributes.get("hlist"): gui_attributes["hlist"] = "<" + list_root_nodes[gui_attributes["hlist"]]["id"] + ">" - prop_comment = prop_class.get("comment") + prop_comment = prop_class.get("comments") if prop_comment: prop_comment = LangString(prop_comment) else: - prop_comment = "no comment given" + prop_comment = LangString({"en": "[no comment provided]"}) new_prop_class = None try: @@ -446,12 +530,13 @@ def create_ontology(input_file: str, print( f"ERROR while trying to create property class {prop_name}. The error message was: {exception}") - new_ontology.lastModificationDate = last_modification_date - if verbose: - print("Created property:") - new_prop_class.print() + if new_prop_class: + new_ontology.lastModificationDate = last_modification_date + if verbose: + print("Created property:") + new_prop_class.print() - # Add cardinality/ies to class + # Add cardinalities to class switcher = { "1": Cardinality.C_1, "0-1": Cardinality.C_0_1, @@ -462,31 +547,33 @@ def create_ontology(input_file: str, for res_class in ontology.get("resources"): if res_class.get("cardinalities"): for card_info in res_class.get("cardinalities"): - rc = new_res_classes.get(new_ontology.id + '#' + res_class.get("name")) + rc = new_res_classes.get(new_ontology.id + "#" + res_class.get("name")) cardinality = switcher[card_info.get("cardinality")] prop_name_for_card = card_info.get("propname") - tmp = prop_name_for_card.split(":") - if len(tmp) > 1: - if tmp[0]: + tmp_group_name = prop_name_for_card.split(":") + if len(tmp_group_name) > 1: + if tmp_group_name[0]: prop_id = prop_name_for_card # fully qualified name else: - prop_id = new_ontology.name + ":" + tmp[1] # prop name refers to actual ontology + prop_id = new_ontology.name + ":" + tmp_group_name[1] # prop name refers to actual ontology else: prop_id = knora_api_prefix + prop_name_for_card # prop name refers to knora-api - try: - last_modification_date = rc.addProperty( - property_id=prop_id, - cardinality=cardinality, - gui_order=card_info.get("gui_order"), - last_modification_date=last_modification_date) - except BaseError as err: - print( - f"ERROR while trying to add cardinality {prop_id} to resource class {res_class.get('name')}." - f"The error message was {err.message}") - except Exception as exception: - print( - f"ERROR while trying to add cardinality {prop_id} to resource class {res_class.get('name')}." - f"The error message was {exception}") - - new_ontology.lastModificationDate = last_modification_date + if rc: + try: + last_modification_date = rc.addProperty( + property_id=prop_id, + cardinality=cardinality, + gui_order=card_info.get("gui_order"), + last_modification_date=last_modification_date) + + except BaseError as err: + print( + f"ERROR while trying to add cardinality {prop_id} to resource class {res_class.get('name')}." + f"The error message was {err.message}") + except Exception as exception: + print( + f"ERROR while trying to add cardinality {prop_id} to resource class {res_class.get('name')}." + f"The error message was {exception}") + + new_ontology.lastModificationDate = last_modification_date diff --git a/knora/dsplib/utils/onto_get.py b/knora/dsplib/utils/onto_get.py index 7fd59fb77..f3ce8bc45 100644 --- a/knora/dsplib/utils/onto_get.py +++ b/knora/dsplib/utils/onto_get.py @@ -1,6 +1,5 @@ import json import re -from typing import Dict from ..models.connection import Connection from ..models.group import Group @@ -83,7 +82,7 @@ def get_ontology(project_identifier: str, outfile: str, server: str, user: str, if verbose: print(f"Getting ontologies...") project_obj["ontologies"] = [] - prefixes: Dict[str, str] = {} + prefixes: dict[str, str] = {} ontologies = Ontology.getProjectOntologies(con, project.id) ontology_ids = [onto.id for onto in ontologies] for ontology_id in ontology_ids: @@ -103,4 +102,4 @@ def get_ontology(project_identifier: str, outfile: str, server: str, user: str, } with open(outfile, 'w', encoding='utf8') as outfile: - json.dump(ontology_json, outfile, indent=3, ensure_ascii=False) + json.dump(ontology_json, outfile, indent=4, ensure_ascii=False) diff --git a/knora/dsplib/utils/xml_upload.py b/knora/dsplib/utils/xml_upload.py index 42c38f4a0..60261b6a0 100644 --- a/knora/dsplib/utils/xml_upload.py +++ b/knora/dsplib/utils/xml_upload.py @@ -5,7 +5,7 @@ import os from datetime import datetime from pathlib import Path -from typing import Dict, List, Optional, Union +from typing import Optional, Union from lxml import etree @@ -34,24 +34,24 @@ class ProjectContext: """Represents the project context""" _projects: list[Project] - _project_map: Dict[str, str] # dictionary of (project name:project IRI) pairs - _inv_project_map: Dict[str, str] # dictionary of (project IRI:project name) pairs + _project_map: dict[str, str] # dictionary of (project name:project IRI) pairs + _inv_project_map: dict[str, str] # dictionary of (project IRI:project name) pairs _groups: Optional[list[Group]] - _group_map: Optional[Dict[str, str]] + _group_map: Optional[dict[str, str]] _shortcode: Optional[str] _project_name: Optional[str] def __init__(self, con: Connection, shortcode: Optional[str] = None): self._shortcode = shortcode self._projects = Project.getAllProjects(con=con) - self._project_map: Dict[str, str] = {x.shortname: x.id for x in self._projects} - self._inv_project_map: Dict[str, str] = {x.id: x.shortname for x in self._projects} + self._project_map: dict[str, str] = {x.shortname: x.id for x in self._projects} + self._inv_project_map: dict[str, str] = {x.id: x.shortname for x in self._projects} try: self._groups = Group.getAllGroups(con=con) except BaseError: self._groups = None if self._groups: - self._group_map: Dict[str, str] = {self._inv_project_map[x.project] + ':' + x.name: x.id for x in + self._group_map: dict[str, str] = {self._inv_project_map[x.project] + ':' + x.name: x.id for x in self._groups} else: self._group_map = None @@ -64,7 +64,7 @@ def __init__(self, con: Connection, shortcode: Optional[str] = None): break @property - def group_map(self) -> Dict[str, str]: + def group_map(self) -> dict[str, str]: """Dictionary of (project:group name) and (group id) pairs of all groups in project""" return self._group_map @@ -103,7 +103,7 @@ class XMLValue: """Represents a value of a resource property in the XML used for data import""" _value: Union[str, KnoraStandoffXml] - _resrefs: Optional[List[str]] + _resrefs: Optional[list[str]] _comment: str _permissions: str _is_richtext: bool @@ -137,7 +137,7 @@ def value(self) -> Union[str, KnoraStandoffXml]: return self._value @property - def resrefs(self) -> Optional[List[str]]: + def resrefs(self) -> Optional[list[str]]: """List of resource references""" return self._resrefs @@ -171,7 +171,7 @@ class XMLProperty: _name: str _valtype: str - _values: List[XMLValue] + _values: list[XMLValue] def __init__(self, node: etree.Element, valtype: str, default_ontology: Optional[str] = None): """ @@ -214,7 +214,7 @@ def valtype(self) -> str: return self._valtype @property - def values(self) -> List[XMLValue]: + def values(self) -> list[XMLValue]: """List of values of this property""" return self._values @@ -233,7 +233,7 @@ class XMLResource: _restype: str _permissions: Optional[str] _bitstream: Optional[XMLBitstream] - _properties: List[XMLProperty] + _properties: list[XMLProperty] def __init__(self, node: etree.Element, default_ontology: Optional[str] = None) -> None: """ @@ -305,14 +305,14 @@ def print(self) -> None: for prop in self._properties: prop.print() - def get_resptrs(self) -> List[str]: + def get_resptrs(self) -> list[str]: """ Get a list of all resource id's that are referenced by this resource Returns: List of resources identified by their unique id's (as given in the XML) """ - resptrs: List[str] = [] + resptrs: list[str] = [] for prop in self._properties: if prop.valtype == 'resptr': for value in prop.values: @@ -323,7 +323,7 @@ def get_resptrs(self) -> List[str]: resptrs.extend(value.resrefs) return resptrs - def get_propvals(self, resiri_lookup: Dict[str, str], permissions_lookup: Dict[str, Permissions]) -> Dict[ + def get_propvals(self, resiri_lookup: dict[str, str], permissions_lookup: dict[str, Permissions]) -> dict[ str, Permissions]: """ Get a dictionary of the property names and their values belonging to a resource @@ -338,7 +338,7 @@ def get_propvals(self, resiri_lookup: Dict[str, str], permissions_lookup: Dict[s """ prop_data = {} for prop in self._properties: - vals: List[Union[str, Dict[str, str]]] = [] + vals: list[Union[str, dict[str, str]]] = [] for value in prop.values: if prop.valtype == 'resptr': # we have a resptr, therefore simple lookup or IRI iri = resiri_lookup.get(value.value) @@ -371,8 +371,8 @@ def get_propvals(self, resiri_lookup: Dict[str, str], permissions_lookup: Dict[s prop_data[prop.name] = vals if len(vals) > 1 else vals[0] return prop_data - def get_bitstream(self, internal_file_name_bitstream: str, permissions_lookup: Dict[str, Permissions]) -> Optional[ - Dict[str, Union[str, Permissions]]]: + def get_bitstream(self, internal_file_name_bitstream: str, permissions_lookup: dict[str, Permissions]) -> Optional[ + dict[str, Union[str, Permissions]]]: """ Get the bitstream object belonging to the resource @@ -449,7 +449,7 @@ class XmlPermission: """Represents the permission set containing several XmlAllow elements in the XML used for data import""" _id: str - _allows: List[XmlAllow] + _allows: list[XmlAllow] def __init__(self, node: etree.Element, project_context: ProjectContext) -> None: """ @@ -470,7 +470,7 @@ def id(self) -> str: return self._id @property - def allows(self) -> List[XmlAllow]: + def allows(self) -> list[XmlAllow]: """List of XmlAllow elements defining permissions for specific groups""" return self._allows @@ -482,7 +482,7 @@ def get_permission_instance(self) -> Permissions: return permissions def __str__(self): - allow_str: List[str] = [] + allow_str: list[str] = [] for allow in self._allows: allow_str.append("{} {}".format(allow.permission, allow.group)) return '|'.join(allow_str) @@ -494,7 +494,7 @@ def print(self): a.print() -def do_sort_order(resources: List[XMLResource], verbose) -> List[XMLResource]: +def do_sort_order(resources: list[XMLResource], verbose) -> list[XMLResource]: """ Sorts a list of resources. @@ -511,11 +511,11 @@ def do_sort_order(resources: List[XMLResource], verbose) -> List[XMLResource]: if verbose: print("Checking resources for unresolvable references...") - + # sort the resources according to outgoing resptrs - ok_resources: List[XMLResource] = [] - nok_resources: List[XMLResource] = [] - ok_res_ids: List[str] = [] + ok_resources: list[XMLResource] = [] + nok_resources: list[XMLResource] = [] + ok_res_ids: list[str] = [] cnt = 0 nok_len = 9999999 while len(resources) > 0 and cnt < 10000: @@ -611,8 +611,8 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s con.login(user, password) proj_context = ProjectContext(con=con) - resources: List[XMLResource] = [] - permissions: Dict[str, XmlPermission] = {} + resources: list[XMLResource] = [] + permissions: dict[str, XmlPermission] = {} # parse the XML file containing the data tree = etree.parse(input_file) @@ -654,16 +654,16 @@ def xml_upload(input_file: str, server: str, user: str, password: str, imgdir: s project = ResourceInstanceFactory(con, shortcode) # create a dictionary to look up permissions - permissions_lookup: Dict[str, Permissions] = {} + permissions_lookup: dict[str, Permissions] = {} for key, perm in permissions.items(): permissions_lookup[key] = perm.get_permission_instance() # create a dictionary to look up resource classes - res_classes: Dict[str, type] = {} + res_classes: dict[str, type] = {} for res_class_name in project.get_resclass_names(): res_classes[res_class_name] = project.get_resclass(res_class_name) - res_iri_lookup: Dict[str, str] = {} + res_iri_lookup: dict[str, str] = {} failed_uploads = [] for resource in resources: diff --git a/test/e2e/test_tools.py b/test/e2e/test_tools.py index a9fa76f10..46e397add 100644 --- a/test/e2e/test_tools.py +++ b/test/e2e/test_tools.py @@ -162,7 +162,7 @@ def test_create_ontology(self) -> None: create_ontology(input_file=self.test_onto_file, lists_file='lists-out.json', server=self.server, - user=self.user, + user_mail=self.user, password='test', verbose=False, dump=True)