How to use is_private method in Nose

Best Python code snippet using nose

DataLayerOperator.py

Source:DataLayerOperator.py Github

copy

Full Screen

1# Copyright 2020 The KNIX Authors2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14from DataLayerClient import DataLayerClient15class DataLayerOperator:16 def __init__(self, suid, sid, wid, datalayer):17 self._storage_userid = suid18 self._sandboxid = sid19 self._workflowid = wid20 self._datalayer = datalayer21 # global data layer clients for either workflow-private data or user storage22 self._data_layer_client = None23 self._data_layer_client_private = None24 # TODO (?): use the local data layer for operations regarding KV, maps, sets and counters instead of in-memory data structures (e.g., transient_data_output)25 # and store the operations/data for is_queued = True operations,26 # so that we can synchronize it with the global data layer27 # (key, value) store28 self.transient_data_output = {}29 self.transient_data_output_private = {}30 self.data_to_be_deleted = {}31 self.data_to_be_deleted_private = {}32 self.map_output = {}33 self.set_output = {}34 self.counter_output = {}35 self.map_output_delete = {}36 self.set_output_delete = {}37 self.counter_output_delete = {}38 # TODO: update to use local data layer for (key, value) operations39 def put(self, key, value, is_private=False, is_queued=False, table=None):40 if is_queued:41 if is_private:42 self.transient_data_output_private[key] = value43 if key in self.data_to_be_deleted_private:44 self.data_to_be_deleted_private.pop(key, None)45 else:46 self.transient_data_output[key] = value47 if key in self.data_to_be_deleted:48 self.data_to_be_deleted.pop(key, None)49 else:50 data_layer_client = self._get_data_layer_client(is_private)51 data_layer_client.put(key, value, tableName=table)52 def get(self, key, is_private=False, table=None):53 # check first transient_output54 # if not there, return the actual (global) data layer data item55 # if not there either, return empty string (as defined in the DataLayerClient)56 value = None57 # if the put() or delete() were called with is_queued=False (default),58 # then the below checks will still result in 'value is None'59 # if not, then value will be obtained from the transient output60 if is_private:61 if key in self.data_to_be_deleted_private:62 return ""63 value = self.transient_data_output_private.get(key)64 else:65 if key in self.data_to_be_deleted:66 return ""67 value = self.transient_data_output.get(key)68 if value is None:69 data_layer_client = self._get_data_layer_client(is_private)70 value = data_layer_client.get(key, tableName=table)71 return value72 def delete(self, key, is_private=False, is_queued=False, table=None):73 if is_queued:74 if is_private:75 self.transient_data_output_private.pop(key, None)76 self.data_to_be_deleted_private[key] = True77 else:78 self.transient_data_output.pop(key, None)79 self.data_to_be_deleted[key] = True80 else:81 data_layer_client = self._get_data_layer_client(is_private)82 data_layer_client.delete(key, tableName=table)83 def getKeys(self, start_index, end_index, is_private=False):84 keys = set()85 # XXX: should follow "read your writes"86 # the final result should include:87 # 1. all created locally88 # 2. all existing globally minus the ones deleted locally89 # TODO: 1. check local data layer first: get locally created and deleted90 # 2. retrieve all existing globally91 dlc = self._get_data_layer_client(is_private)92 m2 = dlc.listKeys(start_index, end_index)93 if m2 is not None:94 # TODO: 3. remove the ones deleted locally95 keys = keys.union(m2)96 return list(keys)97 # map operations98 def createMap(self, mapname, is_private=False, is_queued=False):99 if is_queued:100 # TODO: use transient data structure in memory when the operation is queued101 pass102 else:103 dlc = self._get_data_layer_client(is_private)104 dlc.createMap(mapname)105 def putMapEntry(self, mapname, key, value, is_private=False, is_queued=False):106 if is_queued:107 # TODO: use transient data structure in memory when the operation is queued108 pass109 else:110 dlc = self._get_data_layer_client(is_private)111 dlc.putMapEntry(mapname, key, value)112 def getMapEntry(self, mapname, key, is_private=False):113 value = None114 # TODO: check transient data structure first115 if value is None:116 dlc = self._get_data_layer_client(is_private)117 value = dlc.getMapEntry(mapname, key)118 return value119 def deleteMapEntry(self, mapname, key, is_private=False, is_queued=False):120 if is_queued:121 # TODO: use transient data structure in memory when the operation is queued122 pass123 else:124 dlc = self._get_data_layer_client(is_private)125 dlc.deleteMapEntry(mapname, key)126 def containsMapKey(self, mapname, key, is_private=False):127 ret = False128 # TODO: check transient data structure first129 if not ret:130 dlc = self._get_data_layer_client(is_private)131 ret = dlc.containsMapKey(mapname, key)132 return ret133 def retrieveMap(self, mapname, is_private=False):134 retmap = {}135 # XXX: should follow "read your writes"136 # the final result should include:137 # 1. all created locally138 # 2. all existing globally minus the ones deleted locally139 # TODO: 1. check local data layer first: get locally created and deleted140 # 2. retrieve all existing globally141 dlc = self._get_data_layer_client(is_private)142 retmap2 = dlc.retrieveMap(mapname)143 if retmap2 is not None:144 for k in retmap2:145 retmap[k] = retmap2[k]146 # TODO: 3. remove the ones deleted locally147 return retmap148 def getMapKeys(self, mapname, is_private=False):149 keys = set()150 # XXX: should follow "read your writes"151 # the final result should include:152 # 1. all created locally153 # 2. all existing globally minus the ones deleted locally154 # TODO: 1. check local data layer first: get locally created and deleted155 # 2. retrieve all existing globally156 dlc = self._get_data_layer_client(is_private)157 k2 = dlc.getMapKeys(mapname)158 if k2 is not None:159 # TODO: 3. remove the ones deleted locally160 keys = keys.union(k2)161 return keys162 def clearMap(self, mapname, is_private=False, is_queued=False):163 if is_queued:164 # TODO: use transient data structure in memory when the operation is queued165 pass166 else:167 dlc = self._get_data_layer_client(is_private)168 dlc.clearMap(mapname)169 def deleteMap(self, mapname, is_private=False, is_queued=False):170 if is_queued:171 # TODO: use transient data structure in memory when the operation is queued172 pass173 else:174 dlc = self._get_data_layer_client(is_private)175 dlc.deleteMap(mapname)176 def getMapNames(self, start_index=0, end_index=2147483647, is_private=False):177 maps = set()178 # XXX: should follow "read your writes"179 # the final result should include:180 # 1. all created locally181 # 2. all existing globally minus the ones deleted locally182 # TODO: 1. check local data layer first: get locally created and deleted183 # 2. retrieve all existing globally184 dlc = self._get_data_layer_client(is_private)185 m2 = dlc.getMapNames(start_index, end_index)186 if m2 is not None:187 # TODO: 3. remove the ones deleted locally188 maps = maps.union(m2)189 return list(maps)190 # set operations191 def createSet(self, setname, is_private=False, is_queued=False):192 if is_queued:193 # TODO: use transient data structure in memory when the operation is queued194 pass195 else:196 dlc = self._get_data_layer_client(is_private)197 dlc.createSet(setname)198 def addSetEntry(self, setname, item, is_private=False, is_queued=False):199 if is_queued:200 # TODO: use transient data structure in memory when the operation is queued201 pass202 else:203 dlc = self._get_data_layer_client(is_private)204 dlc.addSetEntry(setname, item)205 def removeSetEntry(self, setname, item, is_private=False, is_queued=False):206 if is_queued:207 # TODO: use transient data structure in memory when the operation is queued208 pass209 else:210 dlc = self._get_data_layer_client(is_private)211 dlc.removeSetEntry(setname, item)212 def containsSetItem(self, setname, item, is_private=False):213 ret = False214 # TODO: check transient data structure first215 if not ret:216 dlc = self._get_data_layer_client(is_private)217 ret = dlc.containsSetItem(setname, item)218 return ret219 def retrieveSet(self, setname, is_private=False):220 items = set()221 # XXX: should follow "read your writes"222 # the final result should include:223 # 1. all created locally224 # 2. all existing globally minus the ones deleted locally225 # TODO: 1. check local data layer first: get locally created and deleted226 # 2. retrieve all existing globally227 dlc = self._get_data_layer_client(is_private)228 i2 = dlc.retrieveSet(setname)229 if i2 is not None:230 # TODO: 3. remove the ones deleted locally231 items = items.union(i2)232 return items233 def clearSet(self, setname, is_private=False, is_queued=False):234 if is_queued:235 # TODO: use transient data structure in memory when the operation is queued236 pass237 else:238 dlc = self._get_data_layer_client(is_private)239 dlc.clearSet(setname)240 def deleteSet(self, setname, is_private=False, is_queued=False):241 if is_queued:242 # TODO: use transient data structure in memory when the operation is queued243 pass244 else:245 dlc = self._get_data_layer_client(is_private)246 dlc.deleteSet(setname)247 def getSetNames(self, start_index=0, end_index=2147483647, is_private=False):248 sets = set()249 # XXX: should follow "read your writes"250 # the final result should include:251 # 1. all created locally252 # 2. all existing globally minus the ones deleted locally253 # TODO: 1. check local data layer first: get locally created and deleted254 # 2. retrieve all existing globally255 dlc = self._get_data_layer_client(is_private)256 s2 = dlc.getSetNames(start_index, end_index)257 if s2 is not None:258 # TODO: 3. remove the ones deleted locally259 sets = sets.union(s2)260 return list(sets)261 # counter operations262 def createCounter(self, countername, count, is_private=False, is_queued=False):263 if is_queued:264 # TODO: use transient data structure in memory when the operation is queued265 pass266 else:267 dlc = self._get_data_layer_client(is_private)268 dlc.createCounter(countername, count)269 def getCounterValue(self, countername, is_private=False):270 value = 0271 # TODO: check transient data structure first and apply any changes to the global value272 dlc = self._get_data_layer_client(is_private)273 value = dlc.getCounter(countername)274 return value275 def incrementCounter(self, countername, increment, is_private=False, is_queued=False):276 if is_queued:277 # TODO: use transient data structure in memory when the operation is queued278 pass279 else:280 dlc = self._get_data_layer_client(is_private)281 dlc.incrementCounter(countername, increment)282 def decrementCounter(self, countername, decrement, is_private=False, is_queued=False):283 if is_queued:284 # TODO: use transient data structure in memory when the operation is queued285 pass286 else:287 dlc = self._get_data_layer_client(is_private)288 dlc.decrementCounter(countername, decrement)289 def deleteCounter(self, countername, is_private=False, is_queued=False):290 if is_queued:291 # TODO: use transient data structure in memory when the operation is queued292 pass293 else:294 dlc = self._get_data_layer_client(is_private)295 dlc.deleteCounter(countername)296 def getCounterNames(self, start_index=0, end_index=2147483647, is_private=False):297 counters = set()298 # XXX: should follow "read your writes"299 # the final result should include:300 # 1. all created locally301 # 2. all existing globally minus the ones deleted locally302 # TODO: 1. check local data layer first: get locally created and deleted303 # 2. retrieve all existing globally304 dlc = self._get_data_layer_client(is_private)305 c2 = dlc.getCounterNames(start_index, end_index)306 if c2 is not None:307 # TODO: 3. remove the ones deleted locally308 counters = counters.union(c2)309 return list(counters)310 def get_transient_data_output(self, is_private=False):311 '''312 Return the transient data, so that it can be committed to the data layer313 when the function instance finishes.314 '''315 if is_private:316 return self.transient_data_output_private317 return self.transient_data_output318 def get_data_to_be_deleted(self, is_private=False):319 '''320 Return the list of deleted data items, so that they can be committed to the data layer321 when the function instance finishes.322 '''323 if is_private:324 return self.data_to_be_deleted_private325 return self.data_to_be_deleted326 def _get_data_layer_client(self, is_private=False):327 '''328 Return the data layer client, so that it can be used to commit to the data layer329 when the function instance finishes.330 If it is not initialized yet, it will be initialized here.331 '''332 # TODO: need also the locality information333 if is_private:334 if self._data_layer_client_private is None:335 self._data_layer_client_private = DataLayerClient(locality=1, sid=self._sandboxid, wid=self._workflowid, is_wf_private=True, connect=self._datalayer)336 return self._data_layer_client_private337 if self._data_layer_client is None:338 self._data_layer_client = DataLayerClient(locality=1, suid=self._storage_userid, is_wf_private=False, connect=self._datalayer)339 return self._data_layer_client340 def _shutdown_data_layer_client(self):341 '''342 Shut down the data layer client if it has been initialized343 after the function instance finishes committing changes344 to the data layer.345 '''346 if self._data_layer_client_private is not None:347 self._data_layer_client_private.shutdown()348 self._data_layer_client_private = None349 if self._data_layer_client is not None:350 self._data_layer_client.shutdown()...

Full Screen

Full Screen

file_manager.py

Source:file_manager.py Github

copy

Full Screen

1# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors2# MIT License. See license.txt3from __future__ import unicode_literals4import frappe5import os, base64, re6import hashlib7import mimetypes8import io9from frappe.utils import get_hook_method, get_files_path, random_string, encode, cstr, call_hook_method, cint10from frappe import _11from frappe import conf12from copy import copy13from six.moves.urllib.parse import unquote14from six import text_type, PY215class MaxFileSizeReachedError(frappe.ValidationError):16 pass17def get_file_url(file_data_name):18 data = frappe.db.get_value("File", file_data_name, ["file_name", "file_url"], as_dict=True)19 return data.file_url or data.file_name20def upload():21 # get record details22 dt = frappe.form_dict.doctype23 dn = frappe.form_dict.docname24 df = frappe.form_dict.docfield25 file_url = frappe.form_dict.file_url26 filename = frappe.form_dict.filename27 frappe.form_dict.is_private = cint(frappe.form_dict.is_private)28 if not filename and not file_url:29 frappe.msgprint(_("Please select a file or url"),30 raise_exception=True)31 file_doc = get_file_doc()32 comment = {}33 if dt and dn:34 comment = frappe.get_doc(dt, dn).add_comment("Attachment",35 _("added {0}").format("<a href='{file_url}' target='_blank'>{file_name}</a>{icon}".format(**{36 "icon": ' <i class="fa fa-lock text-warning"></i>' \37 if file_doc.is_private else "",38 "file_url": file_doc.file_url.replace("#", "%23") \39 if file_doc.file_name else file_doc.file_url,40 "file_name": file_doc.file_name or file_doc.file_url41 })))42 return {43 "name": file_doc.name,44 "file_name": file_doc.file_name,45 "file_url": file_doc.file_url,46 "is_private": file_doc.is_private,47 "comment": comment.as_dict() if comment else {}48 }49def get_file_doc(dt=None, dn=None, folder=None, is_private=None, df=None):50 '''returns File object (Document) from given parameters or form_dict'''51 r = frappe.form_dict52 if dt is None: dt = r.doctype53 if dn is None: dn = r.docname54 if df is None: df = r.docfield55 if folder is None: folder = r.folder56 if is_private is None: is_private = r.is_private57 if r.filedata:58 file_doc = save_uploaded(dt, dn, folder, is_private, df)59 elif r.file_url:60 file_doc = save_url(r.file_url, r.filename, dt, dn, folder, is_private, df)61 return file_doc62def save_uploaded(dt, dn, folder, is_private, df=None):63 fname, content = get_uploaded_content()64 if content:65 return save_file(fname, content, dt, dn, folder, is_private=is_private, df=df);66 else:67 raise Exception68def save_url(file_url, filename, dt, dn, folder, is_private, df=None):69 # if not (file_url.startswith("http://") or file_url.startswith("https://")):70 # frappe.msgprint("URL must start with 'http://' or 'https://'")71 # return None, None72 file_url = unquote(file_url)73 file_size = frappe.form_dict.file_size74 f = frappe.get_doc({75 "doctype": "File",76 "file_url": file_url,77 "file_name": filename,78 "attached_to_doctype": dt,79 "attached_to_name": dn,80 "attached_to_field": df,81 "folder": folder,82 "file_size": file_size,83 "is_private": is_private84 })85 f.flags.ignore_permissions = True86 try:87 f.insert()88 except frappe.DuplicateEntryError:89 return frappe.get_doc("File", f.duplicate_entry)90 return f91def get_uploaded_content():92 # should not be unicode when reading a file, hence using frappe.form93 if 'filedata' in frappe.form_dict:94 if "," in frappe.form_dict.filedata:95 frappe.form_dict.filedata = frappe.form_dict.filedata.rsplit(",", 1)[1]96 frappe.uploaded_content = base64.b64decode(frappe.form_dict.filedata)97 frappe.uploaded_filename = frappe.form_dict.filename98 return frappe.uploaded_filename, frappe.uploaded_content99 else:100 frappe.msgprint(_('No file attached'))101 return None, None102def save_file(fname, content, dt, dn, folder=None, decode=False, is_private=0, df=None):103 if decode:104 if isinstance(content, text_type):105 content = content.encode("utf-8")106 if b"," in content:107 content = content.split(",")[1]108 content = base64.b64decode(content)109 file_size = check_max_file_size(content)110 content_hash = get_content_hash(content)111 content_type = mimetypes.guess_type(fname)[0]112 fname = get_file_name(fname, content_hash[-6:])113 file_data = get_file_data_from_hash(content_hash, is_private=is_private)114 if not file_data:115 call_hook_method("before_write_file", file_size=file_size)116 write_file_method = get_hook_method('write_file', fallback=save_file_on_filesystem)117 file_data = write_file_method(fname, content, content_type=content_type, is_private=is_private)118 file_data = copy(file_data)119 file_data.update({120 "doctype": "File",121 "attached_to_doctype": dt,122 "attached_to_name": dn,123 "attached_to_field": df,124 "folder": folder,125 "file_size": file_size,126 "content_hash": content_hash,127 "is_private": is_private128 })129 f = frappe.get_doc(file_data)130 f.flags.ignore_permissions = True131 try:132 f.insert()133 except frappe.DuplicateEntryError:134 return frappe.get_doc("File", f.duplicate_entry)135 return f136def get_file_data_from_hash(content_hash, is_private=0):137 for name in frappe.db.sql_list("select name from `tabFile` where content_hash=%s and is_private=%s", (content_hash, is_private)):138 b = frappe.get_doc('File', name)139 return {k: b.get(k) for k in frappe.get_hooks()['write_file_keys']}140 return False141def save_file_on_filesystem(fname, content, content_type=None, is_private=0):142 fpath = write_file(content, fname, is_private)143 if is_private:144 file_url = "/private/files/{0}".format(fname)145 else:146 file_url = "/files/{0}".format(fname)147 return {148 'file_name': os.path.basename(fpath),149 'file_url': file_url150 }151def get_max_file_size():152 return conf.get('max_file_size') or 10485760153def check_max_file_size(content):154 max_file_size = get_max_file_size()155 file_size = len(content)156 if file_size > max_file_size:157 frappe.msgprint(_("File size exceeded the maximum allowed size of {0} MB").format(158 max_file_size / 1048576),159 raise_exception=MaxFileSizeReachedError)160 return file_size161def write_file(content, fname, is_private=0):162 """write file to disk with a random name (to compare)"""163 file_path = get_files_path(is_private=is_private)164 # create directory (if not exists)165 frappe.create_folder(file_path)166 # write the file167 if isinstance(content, text_type):168 content = content.encode()169 with open(os.path.join(file_path.encode('utf-8'), fname.encode('utf-8')), 'wb+') as f:170 f.write(content)171 return get_files_path(fname, is_private=is_private)172def remove_all(dt, dn, from_delete=False):173 """remove all files in a transaction"""174 try:175 for fid in frappe.db.sql_list("""select name from `tabFile` where176 attached_to_doctype=%s and attached_to_name=%s""", (dt, dn)):177 remove_file(fid, dt, dn, from_delete)178 except Exception as e:179 if e.args[0]!=1054: raise # (temp till for patched)180def remove_file_by_url(file_url, doctype=None, name=None):181 if doctype and name:182 fid = frappe.db.get_value("File", {"file_url": file_url,183 "attached_to_doctype": doctype, "attached_to_name": name})184 else:185 fid = frappe.db.get_value("File", {"file_url": file_url})186 if fid:187 return remove_file(fid)188def remove_file(fid, attached_to_doctype=None, attached_to_name=None, from_delete=False):189 """Remove file and File entry"""190 file_name = None191 if not (attached_to_doctype and attached_to_name):192 attached = frappe.db.get_value("File", fid,193 ["attached_to_doctype", "attached_to_name", "file_name"])194 if attached:195 attached_to_doctype, attached_to_name, file_name = attached196 ignore_permissions, comment = False, None197 if attached_to_doctype and attached_to_name and not from_delete:198 doc = frappe.get_doc(attached_to_doctype, attached_to_name)199 ignore_permissions = doc.has_permission("write") or False200 if frappe.flags.in_web_form:201 ignore_permissions = True202 if not file_name:203 file_name = frappe.db.get_value("File", fid, "file_name")204 comment = doc.add_comment("Attachment Removed", _("Removed {0}").format(file_name))205 frappe.delete_doc("File", fid, ignore_permissions=ignore_permissions)206 return comment207def delete_file_data_content(doc, only_thumbnail=False):208 method = get_hook_method('delete_file_data_content', fallback=delete_file_from_filesystem)209 method(doc, only_thumbnail=only_thumbnail)210def delete_file_from_filesystem(doc, only_thumbnail=False):211 """Delete file, thumbnail from File document"""212 if only_thumbnail:213 delete_file(doc.thumbnail_url)214 else:215 delete_file(doc.file_url)216 delete_file(doc.thumbnail_url)217def delete_file(path):218 """Delete file from `public folder`"""219 if path:220 if ".." in path.split("/"):221 frappe.msgprint(_("It is risky to delete this file: {0}. Please contact your System Manager.").format(path))222 parts = os.path.split(path.strip("/"))223 if parts[0]=="files":224 path = frappe.utils.get_site_path("public", "files", parts[-1])225 else:226 path = frappe.utils.get_site_path("private", "files", parts[-1])227 path = encode(path)228 if os.path.exists(path):229 os.remove(path)230def get_file(fname):231 """Returns [`file_name`, `content`] for given file name `fname`"""232 file_path = get_file_path(fname)233 # read the file234 if PY2:235 with open(encode(file_path)) as f:236 content = f.read()237 else:238 with io.open(encode(file_path), mode='rb') as f:239 try:240 # for plain text files241 content = f.read().decode()242 except UnicodeDecodeError:243 # for .png, .jpg, etc244 content = f.read()245 return [file_path.rsplit("/", 1)[-1], content]246def get_file_path(file_name):247 """Returns file path from given file name"""248 f = frappe.db.sql("""select file_url from `tabFile`249 where name=%s or file_name=%s""", (file_name, file_name))250 if f:251 file_name = f[0][0]252 file_path = file_name253 if "/" not in file_path:254 file_path = "/files/" + file_path255 if file_path.startswith("/private/files/"):256 file_path = get_files_path(*file_path.split("/private/files/", 1)[1].split("/"), is_private=1)257 elif file_path.startswith("/files/"):258 file_path = get_files_path(*file_path.split("/files/", 1)[1].split("/"))259 else:260 frappe.throw(_("There is some problem with the file url: {0}").format(file_path))261 return file_path262def get_content_hash(content):263 if isinstance(content, text_type):264 content = content.encode()265 return hashlib.md5(content).hexdigest()266def get_file_name(fname, optional_suffix):267 # convert to unicode268 fname = cstr(fname)269 n_records = frappe.db.sql("select name from `tabFile` where file_name=%s", fname)270 if len(n_records) > 0 or os.path.exists(encode(get_files_path(fname))):271 f = fname.rsplit('.', 1)272 if len(f) == 1:273 partial, extn = f[0], ""274 else:275 partial, extn = f[0], "." + f[1]276 return '{partial}{suffix}{extn}'.format(partial=partial, extn=extn, suffix=optional_suffix)277 return fname278@frappe.whitelist()279def download_file(file_url):280 """281 Download file using token and REST API. Valid session or282 token is required to download private files.283 Method : GET284 Endpoint : frappe.utils.file_manager.download_file285 URL Params : file_name = /path/to/file relative to site path286 """287 file_doc = frappe.get_doc("File", {"file_url":file_url})288 file_doc.check_permission("read")289 with open(getattr(frappe.local, "site_path", None) + file_url, "rb") as fileobj:290 filedata = fileobj.read()291 frappe.local.response.filename = file_url[file_url.rfind("/")+1:]292 frappe.local.response.filecontent = filedata293 frappe.local.response.type = "download"294def extract_images_from_doc(doc, fieldname):295 content = doc.get(fieldname)296 content = extract_images_from_html(doc, content)297 if frappe.flags.has_dataurl:298 doc.set(fieldname, content)299def extract_images_from_html(doc, content):300 frappe.flags.has_dataurl = False301 def _save_file(match):302 data = match.group(1)303 data = data.split("data:")[1]304 headers, content = data.split(",")305 if "filename=" in headers:306 filename = headers.split("filename=")[-1]307 # decode filename308 if not isinstance(filename, text_type):309 filename = text_type(filename, 'utf-8')310 else:311 mtype = headers.split(";")[0]312 filename = get_random_filename(content_type=mtype)313 doctype = doc.parenttype if doc.parent else doc.doctype314 name = doc.parent or doc.name315 # TODO fix this316 file_url = save_file(filename, content, doctype, name, decode=True).get("file_url")317 if not frappe.flags.has_dataurl:318 frappe.flags.has_dataurl = True319 return '<img src="{file_url}"'.format(file_url=file_url)320 if content:321 content = re.sub('<img[^>]*src\s*=\s*["\'](?=data:)(.*?)["\']', _save_file, content)322 return content323def get_random_filename(extn=None, content_type=None):324 if extn:325 if not extn.startswith("."):326 extn = "." + extn327 elif content_type:328 extn = mimetypes.guess_extension(content_type)329 return random_string(7) + (extn or "")330@frappe.whitelist()331def validate_filename(filename):332 from frappe.utils import now_datetime333 timestamp = now_datetime().strftime(" %Y-%m-%d %H:%M:%S")334 fname = get_file_name(filename, timestamp)...

Full Screen

Full Screen

_NodeLocalQueuesMethods.py

Source:_NodeLocalQueuesMethods.py Github

copy

Full Screen

1import sys2import os3sys.path.append(os.path.dirname(os.path.realpath(__file__)))4from _utils import eprint5from _NodeInternalClasses import CloseableQueue6def put(self, value, timeout=None, block=True):7 if self._server == None:8 self._queue.put(value)9 else:10 session_id = self._get_session_id()11 self._request(session_id, value=value, timeout=timeout, block=block)12 response = self._recv_response(session_id)13 if not response["success"]:14 eprint(response["traceback"])15 raise response["exception"]16def get(self, timeout=None, block=True):17 if self._server == None:18 return self._queue.get()19 else:20 session_id = self._get_session_id()21 self._request(session_id, timeout=timeout, block=block)22 response = self._recv_response(session_id)23 if response["success"]:24 return response["data"]["value"]25 else:26 eprint(response["traceback"])27 raise response["exception"]28def qsize(self):29 if self._server == None:30 return self._queue.qsize()31 else:32 session_id = self._get_session_id()33 self._request(session_id)34 response = self._recv_response(session_id)35 if response["success"]:36 return response["data"]["qsize"]37 else:38 eprint(response["traceback"])39 raise response["exception"]40def _locals_queue_put(self, name, value, timeout, block, is_private):41 if self._server == None:42 if not is_private:43 if name not in self._actual_queues:44 self._actual_queues[name] = CloseableQueue()45 self._actual_queues[name].put(value, timeout=timeout, block=block)46 else:47 if name not in self._actual_result_queues_for_future:48 self._actual_result_queues_for_future[name] = CloseableQueue()49 self._actual_result_queues_for_future[name].put(value, timeout=timeout, block=block)50 else:51 session_id = self._get_session_id()52 self._request(session_id, name=name, value=value, timeout=timeout, block=block, is_private=is_private)53 response = self._recv_response(session_id)54 if not response["success"]:55 eprint(response["traceback"])56 raise response["exception"]57def _locals_queue_get(self, name, timeout, block, is_private):58 if self._server == None:59 if not is_private:60 if name not in self._actual_queues:61 self._actual_queues[name] = CloseableQueue()62 return self._actual_queues[name].get(timeout=timeout, block=block)63 else:64 if name not in self._actual_result_queues_for_future:65 self._actual_result_queues_for_future[name] = CloseableQueue()66 return self._actual_result_queues_for_future[name].get(timeout=timeout, block=block)67 else:68 session_id = self._get_session_id()69 self._request(session_id, name=name, timeout=timeout, block=block, is_private=is_private)70 response = self._recv_response(session_id)71 if response["success"]:72 return response["data"]["value"]73 else:74 eprint(response["traceback"])75 raise response["exception"]76def _locals_queue_close(self, name, is_private):77 if self._server == None:78 if not is_private:79 if name not in self._actual_queues:80 return81 self._actual_queues[name].close()82 else:83 if name not in self._actual_result_queues_for_future:84 return85 self._actual_result_queues_for_future[name].close()86 else:87 session_id = self._get_session_id()88 self._request(session_id, name=name, is_private=is_private)89 response = self._recv_response(session_id)90 if not response["success"]:91 eprint(response["traceback"])92 raise response["exception"]93def _locals_queue_len(self, name, is_private):94 if self._server == None:95 if not is_private:96 if name not in self._actual_queues:97 return 098 else:99 return self._actual_queues[name].qsize()100 else:101 if name not in self._actual_result_queues_for_future:102 return 0103 else:104 return self._actual_result_queues_for_future[name].qsize()105 else:106 session_id = self._get_session_id()107 self._request(session_id, name=name, is_private=is_private)108 response = self._recv_response(session_id)109 if response["success"]:110 return response["data"]["len"]111 else:112 eprint(response["traceback"])113 raise response["exception"]114def _locals_queues_delitem(self, name, is_private):115 if self._server == None:116 if not is_private:117 self._actual_queues[name].close()118 del self._actual_queues[name]119 else:120 self._actual_result_queues_for_future[name].close()121 del self._actual_result_queues_for_future[name]122 else:123 session_id = self._get_session_id()124 self._request(session_id, name=name, is_private=is_private)125 response = self._recv_response(session_id)126 if not response["success"]:127 eprint(response["traceback"])128 raise response["exception"]129def _locals_queues_len(self, is_private):130 if self._server == None:131 if not is_private:132 return self._actual_queues.__len__()133 else:134 return self._actual_result_queues_for_future.__len__()135 else:136 session_id = self._get_session_id()137 self._request(session_id, is_private=is_private)138 response = self._recv_response(session_id)139 if response["success"]:140 return response["data"]["len"]141 else:142 eprint(response["traceback"])143 raise response["exception"]144def _locals_queues_contains(self, name, is_private):145 if self._server == None:146 if not is_private:147 return self._actual_queues.__contains__(name)148 else:149 return self._actual_result_queues_for_future.__contains__(name)150 else:151 session_id = self._get_session_id()152 self._request(session_id, name=name, is_private=is_private)153 response = self._recv_response(session_id)154 if response["success"]:155 return response["data"]["contains"]156 else:157 eprint(response["traceback"])158 raise response["exception"]159def _locals_queues_keys(self, is_private):160 if self._server == None:161 if not is_private:162 return self._actual_queues.keys()163 else:164 return self._actual_result_queues_for_future.keys()165 else:166 session_id = self._get_session_id()167 self._request(session_id, is_private=is_private)168 response = self._recv_response(session_id)169 if response["success"]:170 return response["data"]["keys"]171 else:172 eprint(response["traceback"])173 raise response["exception"]174def _locals_queues_iter(self, is_private):175 if self._server == None:176 if not is_private:177 return self._actual_queues.__iter__()178 else:179 return self._actual_result_queues_for_future.__iter__()180 else:181 session_id = self._get_session_id()182 self._request(session_id, is_private=is_private)183 response = self._recv_response(session_id)184 if response["success"]:185 return response["data"]["iter"]186 else:187 eprint(response["traceback"])188 raise response["exception"]189def _locals_queues_clear(self, is_private):190 if self._server == None:191 if not is_private:192 for name in self._actual_queues:193 self._actual_queues[name].close()194 self._actual_queues.clear()195 else:196 for name in self._actual_result_queues_for_future:197 self._actual_result_queues_for_future[name].close()198 self._actual_result_queues_for_future.clear()199 else:200 session_id = self._get_session_id()201 self._request(session_id, is_private=is_private)202 response = self._recv_response(session_id)203 if not response["success"]:204 eprint(response["traceback"])205 raise response["exception"]206def _process_put(self, request):207 try:208 self._queue.put(request["data"]["value"], timeout=request["data"]["timeout"], block=request["block"])209 self._respond_ok(request["session_id"])210 except BaseException as e:211 self._respond_exception(request["session_id"], e)212def _process_get(self, request):213 try:214 value = self._queue.get(timeout=request["data"]["timeout"], block=request["block"])215 self._respond_ok(request["session_id"], value=value)216 except BaseException as e:217 self._respond_exception(request["session_id"], e)218def _process_qsize(self, request):219 try:220 qsize = self._queue.qsize()221 self._respond_ok(request["session_id"], qsize=qsize)222 except BaseException as e:223 self._respond_exception(request["session_id"], e)224def _process__locals_queue_put(self, request):225 try:226 name = request["data"]["name"]227 if not request["data"]["is_private"]:228 if name not in self._actual_queues:229 self._actual_queues[name] = CloseableQueue()230 self._actual_queues[name].put(request["data"]["value"], timeout=request["data"]["timeout"], block=request["block"])231 else:232 if name not in self._actual_result_queues_for_future:233 self._actual_result_queues_for_future[name] = CloseableQueue()234 self._actual_result_queues_for_future[name].put(request["data"]["value"], timeout=request["data"]["timeout"], block=request["block"])235 self._respond_ok(request["session_id"])236 except BaseException as e:237 self._respond_exception(request["session_id"], e)238def _process__locals_queue_get(self, request):239 try:240 name = request["data"]["name"]241 if not request["data"]["is_private"]:242 if name not in self._actual_queues:243 self._actual_queues[name] = CloseableQueue()244 value = self._actual_queues[request["data"]["name"]].get(timeout=request["data"]["timeout"], block=request["block"])245 else:246 if name not in self._actual_result_queues_for_future:247 self._actual_result_queues_for_future[name] = CloseableQueue()248 value = self._actual_result_queues_for_future[request["data"]["name"]].get(timeout=request["data"]["timeout"], block=request["block"])249 self._respond_ok(request["session_id"], value=value)250 except BaseException as e:251 self._respond_exception(request["session_id"], e)252def _process__locals_queue_len(self, request):253 try:254 if not request["data"]["is_private"]:255 if request["data"]["name"] not in self._actual_queues:256 self._respond_ok(request["session_id"], len=0)257 else:258 self._respond_ok(request["session_id"], len=self._actual_queues[request["data"]["name"]].qsize())259 else:260 if request["data"]["name"] not in self._actual_result_queues_for_future:261 self._respond_ok(request["session_id"], len=0)262 else:263 self._respond_ok(request["session_id"], len=self._actual_result_queues_for_future[request["data"]["name"]].qsize())264 except BaseException as e:265 self._respond_exception(request["session_id"], e)266def _process__locals_queue_close(self, request):267 try:268 name = request["data"]["name"]269 if not request["data"]["is_private"]:270 try:271 self._actual_queues[name].close()272 except:273 pass274 else:275 try:276 self._actual_result_queues_for_future[name].close()277 except:278 pass279 self._respond_ok(request["session_id"])280 except BaseException as e:281 self._respond_exception(request["session_id"], e)282def _process__locals_queues_delitem(self, request):283 try:284 name = request["data"]["name"]285 if not request["data"]["is_private"]:286 try:287 self._actual_queues[name].close()288 except:289 pass290 del self._actual_queues[name]291 else:292 try:293 self._actual_result_queues_for_future[name].close()294 except:295 pass296 del self._actual_result_queues_for_future[name]297 self._respond_ok(request["session_id"])298 except BaseException as e:299 self._respond_exception(request["session_id"], e)300def _process__locals_queues_len(self, request):301 try:302 if not request["data"]["is_private"]:303 self._respond_ok(request["session_id"], len=self._actual_queues.__len__())304 else:305 self._respond_ok(request["session_id"], len=self._actual_result_queues_for_future.__len__())306 except BaseException as e:307 self._respond_exception(request["session_id"], e)308def _process__locals_queues_contains(self, request):309 try:310 if not request["data"]["is_private"]:311 self._respond_ok(request["session_id"], contains=self._actual_queues.__contains__(request["data"]["name"]))312 else:313 self._respond_ok(request["session_id"], contains=self._actual_result_queues_for_future.__contains__(request["data"]["name"]))314 except BaseException as e:315 self._respond_exception(request["session_id"], e)316def _process__locals_queues_keys(self, request):317 try:318 if not request["data"]["is_private"]:319 self._respond_ok(request["session_id"], keys=self._actual_queues.keys())320 else:321 self._respond_ok(request["session_id"], keys=self._actual_result_queues_for_future.keys())322 except BaseException as e:323 self._respond_exception(request["session_id"], e)324def _process__locals_queues_iter(self, request):325 try:326 if not request["data"]["is_private"]:327 self._respond_ok(request["session_id"], iter=self._actual_queues.__iter__())328 else:329 self._respond_ok(request["session_id"], iter=self._actual_result_queues_for_future.__iter__())330 except BaseException as e:331 self._respond_exception(request["session_id"], e)332def _process__locals_queues_clear(self, request):333 try:334 if not request["data"]["is_private"]:335 for name in self._actual_queues:336 try:337 self._actual_queues[name].close()338 except:339 pass340 self._actual_queues.clear()341 else:342 for name in self._actual_result_queues_for_future:343 try:344 self._actual_result_queues_for_future[name].close()345 except:346 pass347 self._actual_result_queues_for_future.clear()348 self._respond_ok(request["session_id"])349 except BaseException as e:...

Full Screen

Full Screen

test_bmarkmgr.py

Source:test_bmarkmgr.py Github

copy

Full Screen

1"""Test the basics including the bmark and tags"""2from random import randint3from bookie.models import (4 Bmark,5 BmarkMgr,6 DBSession,7 TagMgr,8)9from bookie.models.auth import User10from bookie.models.stats import (11 StatBookmark,12 StatBookmarkMgr,13)14from bookie.tests import gen_random_word15from bookie.tests import TestDBBase16class TestBmarkMgrStats(TestDBBase):17 """Handle some bmarkmgr stats checks"""18 def test_total_ct(self):19 """Verify that our total count method is working"""20 ct = 521 user = User()22 user.username = gen_random_word(10)23 DBSession.add(user)24 for i in range(ct):25 b = Bmark(26 url=gen_random_word(12),27 username=user.username,28 is_private=False29 )30 b.hash_id = gen_random_word(3)31 DBSession.add(b)32 ct = BmarkMgr.count()33 self.assertEqual(5, ct,34 'We should have 5 public bookmarks: ' + str(ct))35 def test_total_ct_accounts_for_privacy(self):36 """Verify that our total count method is working"""37 ct = 538 user = User()39 user.username = gen_random_word(10)40 DBSession.add(user)41 for i in range(ct):42 b = Bmark(43 url=gen_random_word(12),44 username=user.username,45 is_private=True46 )47 b.hash_id = gen_random_word(3)48 DBSession.add(b)49 ct = BmarkMgr.count(is_private=True)50 self.assertEqual(5, ct,51 'We should have 5 private bookmarks: ' + str(ct))52 def test_unique_ct(self):53 """Verify that our unique count method is working"""54 ct = 555 common = u'testing.com'56 users = []57 for i in range(ct):58 user = User()59 user.username = gen_random_word(10)60 DBSession.add(user)61 users.append(user)62 for i in range(ct - 2):63 b = Bmark(64 url=gen_random_word(12),65 username=users[i].username,66 is_private=False67 )68 DBSession.add(b)69 # Add in our dupes70 c = Bmark(71 url=common,72 username=users[3].username,73 is_private=False74 )75 DBSession.add(c)76 DBSession.flush()77 d = Bmark(78 url=common,79 username=users[4].username,80 is_private=False81 )82 DBSession.add(d)83 DBSession.flush()84 ct = BmarkMgr.count(distinct=True)85 self.assertEqual(4, ct,86 'We should have 4 public bookmarks: ' + str(ct))87 def test_unique_ct_accounts_for_privacy(self):88 """Verify that our unique count method is working"""89 ct = 590 common = u'testing.com'91 users = []92 for i in range(ct):93 user = User()94 user.username = gen_random_word(10)95 DBSession.add(user)96 users.append(user)97 for i in range(ct - 2):98 b = Bmark(99 url=gen_random_word(12),100 username=users[i].username,101 is_private=True102 )103 DBSession.add(b)104 # Add in our dupes105 c = Bmark(106 url=common,107 username=users[3].username,108 is_private=True109 )110 DBSession.add(c)111 DBSession.flush()112 d = Bmark(113 url=common,114 username=users[4].username,115 is_private=True,116 )117 DBSession.add(d)118 DBSession.flush()119 ct = BmarkMgr.count(distinct=True, is_private=True)120 self.assertEqual(4, ct,121 'We should have 4 private bookmarks: ' + str(ct))122 def test_per_user(self):123 """We should only get a pair of results for this single user"""124 ct = 5125 common = u'testing.com'126 user = User()127 user.username = gen_random_word(10)128 DBSession.add(user)129 usercommon = User()130 usercommon.username = common131 DBSession.add(usercommon)132 for i in range(ct - 2):133 b = Bmark(134 url=gen_random_word(12),135 username=user.username,136 is_private=False137 )138 DBSession.add(b)139 # add in our dupes140 c = Bmark(141 url=gen_random_word(10),142 username=usercommon.username,143 is_private=False144 )145 DBSession.add(c)146 DBSession.flush()147 d = Bmark(148 url=gen_random_word(10),149 username=usercommon.username,150 is_private=False151 )152 DBSession.add(d)153 DBSession.flush()154 ct = BmarkMgr.count(username=usercommon.username)155 self.assertEqual(2, ct,156 'We should have 2 public bookmarks: ' + str(ct))157 def test_per_user_accounts_for_privacy(self):158 """We should only get a pair of results for this single user"""159 ct = 5160 common = u'testing.com'161 user = User()162 user.username = gen_random_word(10)163 DBSession.add(user)164 usercommon = User()165 usercommon.username = common166 DBSession.add(usercommon)167 for i in range(ct - 2):168 b = Bmark(169 url=gen_random_word(12),170 username=user.username,171 is_private=True,172 )173 DBSession.add(b)174 c = Bmark(175 url=gen_random_word(10),176 username=usercommon.username,177 is_private=True,178 )179 DBSession.add(c)180 DBSession.flush()181 d = Bmark(182 url=gen_random_word(10),183 username=usercommon.username,184 is_private=True,185 )186 DBSession.add(d)187 DBSession.flush()188 ct = BmarkMgr.count(username=usercommon.username, is_private=True)189 self.assertEqual(2, ct,190 'We should only have 2 private bookmarks: ' + str(ct))191 def test_count_user_bookmarks(self):192 """We should get a total count of both private and public bookmarks"""193 public_bmark_count = 5194 private_bmark_count = 5195 total_bmark_count = public_bmark_count + private_bmark_count196 user = User()197 user.username = gen_random_word(10)198 DBSession.add(user)199 stat_username = "user_bookmarks_{0}".format(user.username)200 # Add the public bookmarks.201 for i in range(public_bmark_count):202 b = Bmark(203 url=gen_random_word(12),204 username=user.username,205 is_private=False,206 )207 DBSession.add(b)208 # Add the private bookmarks.209 for i in range(private_bmark_count):210 b = Bmark(211 url=gen_random_word(12),212 username=user.username,213 is_private=True,214 )215 DBSession.add(b)216 StatBookmarkMgr.count_user_bookmarks(username=user.username)217 res = DBSession.query(StatBookmark).\218 filter(StatBookmark.attrib == stat_username).first()219 self.assertEqual(220 total_bmark_count, res.data,221 'We should have {0} bookmarks: '.format(total_bmark_count) +222 str(res.data))223 def test_delete_all_bookmarks(self):224 """Testing working of delete all bookmarks225 Case 1: No bookmark present226 Case 2: One bookmark present227 Case 3: Multiple bookmarks present"""228 bmark_counts = [0, 1]229 for i in range(10):230 bmark_counts.append(randint(10, 100))231 users = []232 for i in range(len(bmark_counts)):233 user = User()234 user.username = gen_random_word(10)235 users.append(user)236 DBSession.add(user)237 for j in range(i):238 b = Bmark(239 url=gen_random_word(12),240 username=user.username,241 tags=gen_random_word(4),242 )243 b.hash_id = gen_random_word(3)244 DBSession.add(b)245 DBSession.flush()246 for user in users:247 BmarkMgr.delete_all_bookmarks(user.username)248 ct = BmarkMgr.count(user.username)249 self.assertEqual(ct, 0, 'All the bookmarks should be deleted')250 tags = TagMgr.find(username=user.username)251 self.assertEqual(252 len(tags),253 0,254 'There should be no tags left: ' + str(len(tags))255 )256 DBSession.flush()257 def test_find_bookmarks_same_user(self):258 """A user requesting their bookmarks includes private ones"""259 bookmark_count_private = 5260 bookmark_count_public = 5261 user = User()262 user.username = gen_random_word(19)263 DBSession.add(user)264 for i in range(bookmark_count_private):265 b = Bmark(266 url=gen_random_word(12),267 username=user.username,268 )269 DBSession.add(b)270 for i in range(bookmark_count_public):271 b = Bmark(272 url=gen_random_word(12),273 username=user.username,274 is_private=False,275 )276 DBSession.add(b)277 DBSession.flush()278 res = BmarkMgr.find(username=user.username, requested_by=user.username)279 self.assertEqual(280 bookmark_count_private + bookmark_count_public,281 len(res),282 'There should be ' + str(bookmark_count_private +283 bookmark_count_public) +284 ' bookmarks present: ' + str(len(res))285 )286 def test_find_bookmarks_diff_user(self):287 """A user requesting another user's bookmarks get public only"""288 bookmark_count_private = 5289 bookmark_count_public = 5290 user = User()291 user.username = gen_random_word(19)292 DBSession.add(user)293 for i in range(bookmark_count_private):294 b = Bmark(295 url=gen_random_word(12),296 username=user.username,297 is_private=True,298 )299 DBSession.add(b)300 for i in range(bookmark_count_public):301 b = Bmark(302 url=gen_random_word(12),303 username=user.username,304 is_private=False,305 )306 DBSession.add(b)307 DBSession.flush()308 res = BmarkMgr.find(username=user.username,309 requested_by=gen_random_word(19))310 self.assertEqual(311 bookmark_count_public,312 len(res),313 'There should be ' + str(bookmark_count_public) +314 ' bookmarks present: ' + str(len(res))315 )316 # Also check if requested_by is None.317 res = BmarkMgr.find(username=user.username, requested_by=None)318 self.assertEqual(319 bookmark_count_public,320 len(res),321 'There should be ' + str(bookmark_count_public) +322 ' bookmarks present: ' + str(len(res))...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Nose automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful