We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
Hello,
The tags features is not yet supported by this lib. Given i need this feature for a project, for now, i just added a plugin. Below the code.
Shall i fork and PR the mod ? (i don't know if the project still active)
import re import requests import json from nextcloud import NextCloud as _Nextcloud from nextcloud.api_wrappers import OCS_API_CLASSES from nextcloud.api_wrappers.webdav import File, WebDAV from nextcloud.base import WithRequester from nextcloud.requester import WebDAVRequester, WebDAVResponse from xml.etree import ElementTree as ET # to simplify code parts with DAV requests XML_REQUEST_PROPFIND_TEMPLATE = """<?xml version="1.0"?> <d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns" xmlns:nc="http://nextcloud.org/ns"> <d:prop> {0} </d:prop> </d:propfind> """ def build_xml_propfind_query(oc_fields=[], d_fields=[]): props_xml = "" for field in d_fields: props_xml += "<d:{} />".format(field) for field in oc_fields: props_xml += "<oc:{} />".format(field) return XML_REQUEST_PROPFIND_TEMPLATE.format(props_xml) """ Stick new classes to nextcloud_api lib """ WebDAVResponse.METHODS_SUCCESS_CODES['POST'] = WebDAVResponse.METHODS_SUCCESS_CODES['PUT'] class WebDAVExtented(WebDAV): def get_file_property(self, uid, path, field, tag='oc'): get_file_prop_xpath = '{DAV:}propstat/d:prop/%s:%s' % (tag, field) data = build_xml_propfind_query([field]) additional_url = uid additional_url = "{}/{}".format(uid, path) resp = self.requester.propfind( additional_url=additional_url, headers={"Depth": str(0)}, data=data) response_data = resp.data resp.data = None if not resp.is_ok: return resp response_xml_data = ET.fromstring(response_data) for xml_data in response_xml_data: for prop in xml_data.findall(get_file_prop_xpath, File.xml_namespaces_map): resp.data = prop.text break return resp class NextCloud(_Nextcloud): def __init__(self, endpoint, user, password, json_output=True): super().__init__(endpoint, user, password, json_output=json_output) webdav_requester = WebDAVRequester(endpoint, user, password) functionality_classes = [ WebDAVExtented(webdav_requester, json_output=json_output), SystemTags(webdav_requester, json_output=json_output), SystemTagsRelation(webdav_requester, json_output=json_output, client=self), # require to fetch both tag and files ] self.functionality_classes += functionality_classes for functionality_class in functionality_classes: for potential_method in dir(functionality_class): if( potential_method.startswith('_') or not callable(getattr(functionality_class, potential_method)) ): continue setattr(self, potential_method, getattr( functionality_class, potential_method)) """ Define properties models """ class Prop(): def __init__(self, xml_name, json=None, default=None): self.attr_name = self._xml_name_to_py_name(xml_name) self.json_key = json self.xml_key = xml_name self.default_val = default @staticmethod def _xml_name_to_py_name(name): return name.replace('-', '_') @staticmethod def _py_name_to_xml_name(name): return name.replace('_', '-') class DProp(Prop): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.type = 'd' class OCProp(Prop): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.type = 'oc' """ Define generic result class that could be common to File and Tag """ class AttributeCollection(): SUCCESS_STATUS = 'HTTP/1.1 200 OK' COLLECTION_RESOURCE_TYPE = 'collection' _attrs = [] _xml_namespaces_map = File.xml_namespaces_map @property def _fields(self): return [v.attr_name for v in self._attrs] @property def _properties(self): return [v.xml_key for v in self._attrs] def __init__(self, xml_data): self.href = xml_data.find('d:href', self._xml_namespaces_map).text for attr in self._attrs: setattr(self, attr.attr_name, None) for propstat in xml_data.iter('{DAV:}propstat'): if propstat.find('d:status', self._xml_namespaces_map).text != self.SUCCESS_STATUS: continue for xml_property in propstat.find('d:prop', self._xml_namespaces_map): property_name = re.sub("{.*}", "", xml_property.tag) if property_name not in self._properties: continue value = self._get_property_value(xml_property) setattr(self, Prop._xml_name_to_py_name(property_name), value) @classmethod def _get_property_value(self, xml_property): return xml_property.text @classmethod def default_get(cls, key_format='json', **kwargs): vals = {getattr(v, "%s_key" % key_format): v.default_val for v in cls._attrs if getattr(v, "%s_key" % key_format, False)} vals.update(kwargs) return vals @classmethod def build_xml_propfind_query(cls, fields=None): if not fields: fields = [attr.xml_key for attr in cls._attrs] return build_xml_propfind_query(oc_fields=fields) @classmethod def from_response(cls, resp, json_output=None, filtered=None): if not resp.is_ok: resp.data = None return resp response_data = resp.data response_xml_data = ET.fromstring(response_data) attr_datas = [cls(xml_data) for xml_data in response_xml_data] if filtered and callable(filtered): attr_datas = [attr_data for attr_data in attr_datas if filtered(attr_data)] resp.data = attr_datas if not json_output else [ attr_data.as_dict() for attr_data in attr_datas] return resp def as_dict(self): attrs = [v.attr_name for v in self._attrs] return {key: value for key, value in self.__dict__.items() if key in attrs} """ Define main processings """ class Tag(AttributeCollection): _attrs = [ OCProp("id"), OCProp("display-name", json="name", default='default_tag_name'), OCProp("user-visible", json="userVisible", default=True), OCProp("can-assign", json="canAssign", default=True), OCProp("user-assignable", json="userAssignable", default=True) ] class SystemTags(WithRequester): API_URL = '/remote.php/dav/systemtags' CREATED_CODE = 201 def __init__(self, *args, **kwargs): super(SystemTags, self).__init__(*args) self.json_output = kwargs.get('json_output') def get_sytemtag(self, name, fields=None, json_output=None): if not fields: fields = Tag._fields resp = self.requester.propfind( data=Tag.build_xml_propfind_query( set(['display-name', *fields]) ) ) if json_output is None: json_output = self.json_output return Tag.from_response( resp, json_output=json_output, filtered=lambda t: t.display_name == name) def get_systemtags(self, name=None): """ Get list of all tags Returns: response with <list>Tag in data """ resp = self.requester.propfind(data=Tag.build_xml_propfind_query()) return Tag.from_response(resp, json_output=self.json_output) def create_systemtag(self, name, **kwargs): """ Create a new system tag from name. Returns requester response with file id as data """ data = Tag.default_get(name=name, **kwargs) url = self.requester.get_full_url() # i use requests because headers, can't be set in requester.post # resp = self.requester.post(data=data) res = requests.post(url, auth=self.requester.auth_pk, data=json.dumps(data), headers={ "Content-Type": "application/json" }) resp = self.requester.rtn(res) if res.status_code == self.CREATED_CODE: resp.data = int(res.headers['Content-Location'].split('/')[-1]) return resp def delete_systemtag(self, name=None, tag_id=None): """ Delete systemtag Args: name (str): tag name OR tag_id (int): tag id Returns response """ if not tag_id: resp = self.get_sytemtag(name, ['id'], json_output=False) if resp.data: tag_id = resp.data[0].id if tag_id: resp = self.requester.delete(url=str(tag_id)) return resp class FluidArgumentsMixin(): """ a stupid mixin to avoid code repetition using dangerous getattr functions with major counter side of loosing some transaction infos """ def _arguments_get(self, varnames, vals): if 'kwargs' in vals: vals.update(vals['kwargs']) ret = [] for varname in varnames: val = vals.get(varname, None) if val is None: getter_func_name = '_default_get_%s' % varname if hasattr(self, getter_func_name): val = getattr(self, getter_func_name)(vals) ret.append(val) return ret class SystemTagsRelation(WithRequester, FluidArgumentsMixin): API_URL = '/remote.php/dav/systemtags-relations/files' def __init__(self, *args, **kwargs): super(SystemTagsRelation, self).__init__(*args) self.json_output = kwargs.get('json_output') self.client = kwargs.get('client') def _get_fileid_from_path(self, uid, path): """ Tricky function to fetch file """ resp = self.client.get_file_property(uid, path, 'fileid') id_ = None if resp.data: id_ = int(resp.data) return id_ def _get_systemtag_id_from_name(self, name): resp = self.client.get_sytemtag(name, ['id'], json_output=False) tag_id = None if resp.data: tag_id = int(resp.data[0].id) return tag_id def _default_get_file_id(self, vals): uid = vals.get('uid', None) path = vals.get('path', None) if not (uid and path): raise ValueError("Insufficient infos about the file") return self._get_fileid_from_path(uid, path) def _default_get_tag_id(self, vals): tag_name = vals.get('tag_name', None) if not tag_name: raise ValueError("Insufficient infos about the tag") return self._get_systemtag_id_from_name(tag_name) def get_systemtags_relation(self, file_id=None, **kwargs): """ Get all tags from a given file/folder Args: file_id (int): file id found from file object OR uid (str): user (to know from where fetch file) path (str): path to file/folder Returns: """ (file_id,) = self._arguments_get(['file_id'], locals()) data = Tag.build_xml_propfind_query() resp = self.requester.propfind(additional_url=file_id, data=data) return Tag.from_response(resp, json_output=self.json_output) def delete_systemtags_relation(self, file_id=None, tag_id=None, **kwargs): """ Delete a tag from a given file/folder Args: file_id (int): id found in file object tag_id (int): id found in tag object Returns: """ (file_id, tag_id) = self._arguments_get(['file_id', 'tag_id'], locals()) resp = self.requester.delete( url="{}/{}".format(file_id, tag_id)) return resp def add_systemtags_relation(self, file_id=None, tag_id=None, **kwargs): """ set a tag from a given file/folder Args: file_id (int): id found in file object tag_id (int): id found in tag object (if you didn't provided file_id) uid (str): user (to know from where fetch file) path (str): path to file/folder (if you didn't provided tag_id) tag_name (str): tag_name to search or create Returns: """ (file_id, tag_id) = self._arguments_get(['file_id', 'tag_id'], locals()) if not tag_id and 'tag_name' in kwargs: resp = self.client.create_systemtag(kwargs['tag_name']) if not resp.is_ok: return resp tag_id = resp.data if not file_id: raise ValueError('No file found') data = Tag.build_xml_propfind_query() resp = self.requester.put(url="{}/{}".format(file_id, tag_id)) return resp
The text was updated successfully, but these errors were encountered:
No branches or pull requests
Hello,
The tags features is not yet supported by this lib.
Given i need this feature for a project, for now, i just added a plugin.
Below the code.
Shall i fork and PR the mod ? (i don't know if the project still active)
The text was updated successfully, but these errors were encountered: