How to use safe_encode method in Nose2

Best Python code snippet using nose2

browser.py

Source:browser.py Github

copy

Full Screen

...68 to getFolder(). This can be used for example to only show image files69 in a file system tree.70 """71 if six.PY2 and isinstance(path, six.text_type):72 path = safe_encode(path, 'utf-8')73 folders = []74 files = []75 path = self.normalizePath(path)76 folder = self.getObject(path)77 for name in folder.listDirectory():78 if IResourceDirectory.providedBy(folder[name]):79 folders.append(self.getInfo(80 folder[name],81 path='/{0}/{1}/'.format(path, name)82 ))83 else:84 files.append(self.getInfo(85 folder[name],86 path='/{0}/{1}'.format(path, name)87 ))88 return folders + files89 def getFile(self, path):90 if six.PY2:91 path = safe_encode(path, 'utf-8')92 path = self.normalizePath(path)93 ext = self.getExtension(path=path)94 result = {'ext': ext}95 self.request.response.setHeader('Content-Type', 'application/json')96 if ext in self.imageExtensions:97 obj = self.getObject(path)98 info = self.getInfo(obj)99 info['preview'] = path100 result['info'] = self.previewTemplate(info=info)101 return json.dumps(result)102 else:103 data = self.context.openFile(path)104 try:105 data = data.read()106 if six.PY2 and isinstance(data, six.text_type):107 result['contents'] = data.encode('utf8')108 else:109 result['contents'] = safe_unicode(data)110 try:111 return json.dumps(result)112 except UnicodeDecodeError:113 # The file we're trying to get isn't unicode encodable114 # so we just return the file information, not the content115 del result['contents']116 obj = self.getObject(path)117 info = self.getInfo(obj)118 result['info'] = self.previewTemplate(info=info)119 return json.dumps(result)120 except AttributeError:121 return None122 def normalizePath(self, path):123 if path.startswith('/'):124 path = path[1:]125 if path.endswith('/'):126 path = path[:-1]127 return path128 def normalizeReturnPath(self, path):129 if path.endswith('/'):130 path = path[:-1]131 if not path.startswith('/'):132 path = '/' + path133 return path134 def parentPath(self, path):135 return '/'.join(path.split('/')[:-1])136 def getInfo(self, obj, path='/'):137 """Returns information about a single file. Requests138 with mode "getinfo" will include an additional parameter, "path",139 indicating which file to inspect. A boolean parameter "getsize"140 indicates whether the dimensions of the file (if an image) should be141 returned.142 """143 filename = obj.__name__144 properties = {145 'dateModified': None,146 }147 size = 0148 if isinstance(obj, File):149 properties['dateModified'] = DateTime(obj._p_mtime).strftime('%c')150 size = obj.get_size() / 1024151 if IResourceDirectory.providedBy(obj):152 fileType = 'dir'153 is_folder = True154 else:155 fileType = self.getExtension(obj)156 is_folder = False157 if isinstance(obj, FilesystemFile):158 stats = os.stat(obj.path)159 modified = localtime(stats.st_mtime)160 properties['dateModified'] = strftime('%c', modified)161 size = stats.st_size / 1024162 if size < 1024:163 size_specifier = u'kb'164 else:165 size_specifier = u'mb'166 size = size / 1024167 properties['size'] = '{0}{1}'.format(168 size,169 translate(_(u'filemanager_{0}'.format(size_specifier),170 default=size_specifier), context=self.request)171 )172 if isinstance(obj, Image):173 properties['height'] = obj.height174 properties['width'] = obj.width175 return {176 'filename': filename,177 'label': filename,178 'fileType': fileType,179 'filesystem': isinstance(obj, FilesystemFile),180 'properties': properties,181 'path': path,182 'folder': is_folder183 }184 def saveFile(self, path, value):185 path = path.lstrip('/')186 if six.PY2:187 path = safe_encode(path, 'utf-8')188 value = value.strip()189 if six.PY2:190 value = safe_encode(value, 'utf-8')191 value = value.replace('\r\n', '\n')192 if path in self.context:193 if IResourceDirectory.providedBy(self.context[path]):194 return json.dumps({'error': 'invalid path'})195 if 'relativeUrls' in self.request.form:196 reg = re.compile(r'url\(([^)]+)\)')197 urls = reg.findall(value)198 # Trim off the @@plone.resourceeditor bit to just give us the199 # theme url200 limit = self.request.URL.find('@@plone.resourceeditor')201 location = self.request.URL[0:limit]202 base = urlparse(location)203 for url in urls:204 asset = urlparse(url.strip("'").strip('"'))205 if base.netloc != asset.netloc:206 continue207 base_dir = '.' + posixpath.dirname(base.path)208 target = '.' + asset.path209 out = posixpath.relpath(target, start=base_dir)210 value = value.replace(url.strip('"').strip("'"), out)211 self.request.response.setHeader('Content-Type', 'application/json')212 if isinstance(self.context, FilesystemResourceDirectory):213 # we cannot save in an FS directory, but we return the file content214 # (useful when we compile less from the theming editor)215 return json.dumps({'success': 'tmp', 'value': value})216 else:217 self.context.writeFile(path, value)218 return json.dumps({'success': 'save'})219 def addFolder(self, path, name):220 """Create a new directory on the server within the given path.221 """222 if six.PY2:223 path = safe_encode(path, 'utf-8')224 name = safe_encode(name, 'utf-8')225 code = 0226 error = ''227 parentPath = self.normalizePath(path)228 parent = None229 try:230 parent = self.getObject(parentPath)231 except KeyError:232 error = translate(_(u'filemanager_invalid_parent',233 default=u'Parent folder not found.'),234 context=self.request)235 code = 1236 else:237 if not validateFilename(name):238 error = translate(_(u'filemanager_invalid_foldername',239 default=u'Invalid folder name.'),240 context=self.request)241 code = 1242 elif name in parent:243 error = translate(_(u'filemanager_error_folder_exists',244 default=u'Folder already exists.'),245 context=self.request)246 code = 1247 else:248 try:249 parent.makeDirectory(name)250 except UnicodeDecodeError:251 error = translate(252 _(253 u'filemanager_invalid_foldername',254 default=u'Invalid folder name.'255 ),256 context=self.request257 )258 code = 1259 self.request.response.setHeader('Content-Type', 'application/json')260 return json.dumps({261 'parent': self.normalizeReturnPath(parentPath),262 'name': name,263 'error': error,264 'code': code,265 })266 def addFile(self, path, name):267 """Add a new empty file in the given directory268 """269 if six.PY2:270 path = safe_encode(path, 'utf-8')271 name = safe_encode(name, 'utf-8')272 error = ''273 code = 0274 parentPath = self.normalizePath(path)275 newPath = '{0}/{1}'.format(parentPath, name,)276 try:277 parent = self.getObject(parentPath)278 except KeyError:279 error = translate(_(u'filemanager_invalid_parent',280 default=u'Parent folder not found.'),281 context=self.request)282 code = 1283 else:284 if not validateFilename(name):285 error = translate(_(u'filemanager_invalid_filename',286 default=u'Invalid file name.'),287 context=self.request)288 code = 1289 elif name in parent:290 error = translate(_(u'filemanager_error_file_exists',291 default=u'File already exists.'),292 context=self.request)293 code = 1294 else:295 self.resourceDirectory.writeFile(newPath, b'')296 self.request.response.setHeader('Content-Type', 'application/json')297 return json.dumps({298 'parent': self.normalizeReturnPath(parentPath),299 'name': name,300 'error': error,301 'code': code,302 'path': path303 })304 def delete(self, path):305 """Delete the item at the given path.306 """307 if six.PY2:308 path = safe_encode(path, 'utf-8')309 npath = self.normalizePath(path)310 parentPath = '/'.join(npath.split('/')[:-1])311 name = npath.split('/')[-1]312 code = 0313 error = ''314 try:315 parent = self.getObject(parentPath)316 except KeyError:317 error = translate(_(u'filemanager_invalid_parent',318 default=u'Parent folder not found.'),319 context=self.request)320 code = 1321 else:322 try:323 del parent[name]324 except KeyError:325 error = translate(_(u'filemanager_error_file_not_found',326 default=u'File not found.'),327 context=self.request)328 code = 1329 self.request.response.setHeader('Content-Type', 'application/json')330 return json.dumps({331 'path': self.normalizeReturnPath(path),332 'error': error,333 'code': code,334 })335 def renameFile(self, path, newName):336 """Rename the item at the given path to the new name337 """338 if six.PY2:339 path = safe_encode(path, 'utf-8')340 newName = safe_encode(newName, 'utf-8')341 npath = self.normalizePath(path)342 oldPath = newPath = '/'.join(npath.split('/')[:-1])343 oldName = npath.split('/')[-1]344 code = 0345 error = ''346 try:347 parent = self.getObject(oldPath)348 except KeyError:349 error = translate(_(u'filemanager_invalid_parent',350 default=u'Parent folder not found.'),351 context=self.request)352 code = 1353 else:354 if newName != oldName:355 if newName in parent:356 error = translate(357 _(358 u'filemanager_error_file_exists',359 default=u'File already exists.'360 ),361 context=self.request)362 code = 1363 else:364 parent.rename(oldName, newName)365 self.request.response.setHeader('Content-Type', 'application/json')366 return json.dumps({367 'oldParent': self.normalizeReturnPath(oldPath),368 'oldName': oldName,369 'newParent': self.normalizeReturnPath(newPath),370 'newName': newName,371 'error': error,372 'code': code,373 })374 def move(self, path, directory):375 """Move the item at the given path to a new directory376 """377 if six.PY2:378 path = safe_encode(path, 'utf-8')379 directory = safe_encode(directory, 'utf-8')380 npath = self.normalizePath(path)381 newParentPath = self.normalizePath(directory)382 parentPath = self.parentPath(npath)383 filename = npath.split('/')[-1]384 code = 0385 error = ''386 newCanonicalPath = '{0}/{1}'.format(newParentPath, filename)387 try:388 parent = self.getObject(parentPath)389 except KeyError:390 error = translate(_(u'filemanager_invalid_parent',391 default=u'Parent folder not found.'),392 context=self.request)393 code = 1394 return json.dumps({395 'code': code,396 'error': error,397 'newPath': self.normalizeReturnPath(newCanonicalPath),398 })399 try:400 target = self.getObject(newParentPath)401 except KeyError:402 error = translate(_(u'filemanager_error_folder_exists',403 default=u'Destination folder not found.'),404 context=self.request)405 code = 1406 return json.dumps({407 'code': code,408 'error': error,409 'newPath': self.normalizeReturnPath(newCanonicalPath),410 })411 if filename not in parent:412 error = translate(_(u'filemanager_error_file_not_found',413 default=u'File not found.'),414 context=self.request)415 code = 1416 elif filename in target:417 error = translate(_(u'filemanager_error_file_exists',418 default=u'File already exists.'),419 context=self.request)420 code = 1421 else:422 obj = parent[filename]423 del parent[filename]424 target[filename] = obj425 return json.dumps({426 'code': code,427 'error': error,428 'newPath': self.normalizeReturnPath(newCanonicalPath),429 })430 def do_action(self, action):431 if action == 'dataTree':432 def getDirectory(folder, relpath=''):433 items = []434 for name in folder.listDirectory():435 obj = folder[name]436 path = relpath + '/' + name437 if IResourceDirectory.providedBy(obj):438 try:439 children = getDirectory(obj, path)440 except NotFound:441 children = []442 items.append({443 'label': name,444 'folder': True,445 'path': path,446 'children': children447 })448 else:449 items.append(self.getInfo(obj, path))450 return items451 return json.dumps(getDirectory(self.context))452 if action == 'getFile':453 path = self.request.get('path', '')454 return self.getFile(path)455 if action == 'saveFile':456 path = self.request.get('path', '')457 data = self.request.get('data', '')458 return self.saveFile(path, data)459 if action == 'addFolder':460 path = self.request.get('path', '')461 name = self.request.get('name', '')462 return self.addFolder(path, name)463 if action == 'addFile':464 path = self.request.get('path', '')465 name = self.request.get('filename', '')466 return self.addFile(path, name)467 if action == 'renameFile':468 path = self.request.get('path', '')469 name = self.request.get('filename', '')470 return self.renameFile(path, name)471 if action == 'delete':472 path = self.request.get('path', '')473 return self.delete(path)474 if action == 'move':475 src_path = self.request.get('source', '')476 des_path = self.request.get('destination', '')477 return self.move(src_path, des_path)478 def download(self, path):479 """Serve the requested file to the user480 """481 if six.PY2:482 path = safe_encode(path, 'utf-8')483 npath = self.normalizePath(path)484 parentPath = '/'.join(npath.split('/')[:-1])485 name = npath.split('/')[-1]486 parent = self.getObject(parentPath)487 self.request.response.setHeader('Content-Type',488 'application/octet-stream')489 self.request.response.setHeader(490 'Content-Disposition',491 'attachment; filename="{0}"'.format(name)492 )493 # TODO: Use streams here if we can494 return parent.readFile(name)495 def __call__(self):496 action = self.request.get('action')497 return self.do_action(action)498class FileManager(BrowserView):499 """Render the file manager and support its AJAX requests.500 """501 previewTemplate = ViewPageTemplateFile('preview.pt')502 staticFiles = '++resource++plone.resourceeditor/filemanager'503 imageExtensions = ['png', 'gif', 'jpg', 'jpeg']504 knownExtensions = ['css', 'html', 'htm', 'txt', 'xml', 'js', 'cfg']505 capabilities = ['download', 'rename', 'delete']506 extensionsWithIcons = frozenset([507 'aac', 'avi', 'bmp', 'chm', 'css', 'dll', 'doc', 'fla',508 'gif', 'htm', 'html', 'ini', 'jar', 'jpeg', 'jpg', 'js',509 'lasso', 'mdb', 'mov', 'mp3', 'mpg', 'pdf', 'php', 'png',510 'ppt', 'py', 'rb', 'real', 'reg', 'rtf', 'sql', 'swf', 'txt',511 'vbs', 'wav', 'wma', 'wmv', 'xls', 'xml', 'xsl', 'zip',512 ])513 protectedActions = (514 'addfolder', 'add', 'addnew',515 'rename', 'delete'516 )517 def pattern_options(self):518 site = getSite()519 viewName = '@@plone.resourceeditor.filemanager-actions'520 return json.dumps({521 'actionUrl': '{0}/++{1}++{2}/{3}'.format(522 site.absolute_url(),523 self.context.__parent__.__parent__.__name__,524 self.context.__name__,525 viewName526 )527 })528 def mode_selector(self, form):529 # AJAX methods called by the file manager530 mode = form['mode']531 if mode in self.protectedActions:532 authorize(self.context, self.request)533 response = {'error:': 'Unknown request', 'code': -1}534 textareaWrap = False535 if mode == u'getfolder':536 response = self.getFolder(537 path=urllib.parse.unquote(form['path']),538 getSizes=form.get('getsizes', 'false') == 'true'539 )540 elif mode == u'getinfo':541 response = self.getInfo(542 path=urllib.parse.unquote(form['path']),543 getSize=form.get('getsize', 'false') == 'true'544 )545 elif mode == u'addfolder':546 response = self.addFolder(547 path=urllib.parse.unquote(form['path']),548 name=urllib.parse.unquote(form['name'])549 )550 elif mode == u'add':551 textareaWrap = True552 response = self.add(553 path=urllib.parse.unquote(form['currentpath']),554 newfile=form['newfile'],555 replacepath=form.get('replacepath', None)556 )557 elif mode == u'addnew':558 response = self.addNew(559 path=urllib.parse.unquote(form['path']),560 name=urllib.parse.unquote(form['name'])561 )562 elif mode == u'rename':563 response = self.rename(564 path=urllib.parse.unquote(form['old']),565 newName=urllib.parse.unquote(form['new'])566 )567 elif mode == u'delete':568 response = self.delete(569 path=urllib.parse.unquote(form['path'])570 )571 elif mode == 'move':572 response = self.move(573 path=urllib.parse.unquote(form['path']),574 directory=urllib.parse.unquote(form['directory'])575 )576 elif mode == u'download':577 return self.download(578 path=urllib.parse.unquote(form['path'])579 )580 if textareaWrap:581 self.request.response.setHeader('Content-Type', 'text/html')582 return '<textarea>{0}</textarea>'.format(json.dumps(response))583 self.request.response.setHeader('Content-Type',584 'application/json')585 return json.dumps(response)586 def __call__(self):587 # make sure theme is disable for these requests588 self.request.response.setHeader('X-Theme-Disabled', 'True')589 self.request['HTTP_X_THEME_ENABLED'] = False590 self.setup()591 form = self.request.form592 # AJAX methods called by the file manager593 if 'mode' in form:594 return self.mode_selector(form)595 # Rendering the view596 else:597 return self.index()598 def setup(self):599 pass600 @zproperty.Lazy601 def portalUrl(self):602 return getToolByName(self.context, 'portal_url')()603 @zproperty.Lazy604 def resourceDirectory(self):605 return self.context606 @zproperty.Lazy607 def resourceType(self):608 return self.resourceDirectory.__parent__.__parent__.__name__609 @zproperty.Lazy610 def baseUrl(self):611 return '{0}/++{1}++{2}'.format(612 self.portalUrl,613 self.resourceType,614 self.resourceDirectory.__name__615 )616 @zproperty.Lazy617 def fileConnector(self):618 return '{0}/@@{1}'.format(self.baseUrl, self.__name__,)619 @zproperty.Lazy620 def filemanagerConfiguration(self):621 return """\622var FILE_ROOT = '/';623var IMAGES_EXT = {0};624var CAPABILITIES = {1};625var FILE_CONNECTOR = '{2}';626var BASE_URL = '{3}';627""".format(628 repr(self.imageExtensions),629 repr(self.capabilities),630 self.fileConnector,631 self.baseUrl,632 )633 def normalizePath(self, path):634 if path.startswith('/'):635 path = path[1:]636 if path.endswith('/'):637 path = path[:-1]638 return path639 def normalizeReturnPath(self, path):640 if path.endswith('/'):641 path = path[:-1]642 if not path.startswith('/'):643 path = '/' + path644 return path645 def parentPath(self, path):646 return '/'.join(path.split('/')[:-1])647 # AJAX responses648 def getFolder(self, path, getSizes=False):649 """Returns a dict of file and folder objects representing the650 contents of the given directory (indicated by a "path" parameter). The651 values are dicts as returned by getInfo().652 A boolean parameter "getsizes" indicates whether image dimensions653 should be returned for each item. Folders should always be returned654 before files.655 Optionally a "type" parameter can be specified to restrict returned656 files (depending on the connector). If a "type" parameter is given for657 the HTML document, the same parameter value is reused and passed658 to getFolder(). This can be used for example to only show image files659 in a file system tree.660 """661 if six.PY2:662 path = safe_encode(path, 'utf-8')663 folders = []664 files = []665 path = self.normalizePath(path)666 folder = self.getObject(path)667 for name in folder.listDirectory():668 if IResourceDirectory.providedBy(folder[name]):669 folders.append(self.getInfo(670 path='{0}/{1}/'.format(path, name), getSize=getSizes))671 else:672 files.append(self.getInfo(673 path='{0}/{1}'.format(path, name), getSize=getSizes))674 return folders + files675 def getInfo(self, path, getSize=False):676 """Returns information about a single file. Requests677 with mode "getinfo" will include an additional parameter, "path",678 indicating which file to inspect. A boolean parameter "getsize"679 indicates whether the dimensions of the file (if an image) should be680 returned.681 """682 if six.PY2:683 path = safe_encode(path, 'utf-8')684 path = self.normalizePath(path)685 obj = self.getObject(path)686 filename = obj.__name__687 error = ''688 errorCode = 0689 properties = {690 'dateModified': None,691 }692 if isinstance(obj, File):693 properties['dateModified'] = DateTime(obj._p_mtime).strftime('%c')694 size = obj.get_size() / 1024695 if size < 1024:696 size_specifier = u'kb'697 else:698 size_specifier = u'mb'699 size = size / 1024700 properties['size'] = '{0}{1}'.format(701 size,702 translate(_(u'filemanager_{0}'.format(size_specifier),703 default=size_specifier), context=self.request)704 )705 fileType = 'txt'706 siteUrl = self.portalUrl707 resourceName = self.resourceDirectory.__name__708 preview = '{0}/{1}/images/fileicons/default.png'.format(709 siteUrl,710 self.staticFiles711 )712 if IResourceDirectory.providedBy(obj):713 preview = '{0}/{1}/images/fileicons/_Open.png'.format(714 siteUrl,715 self.staticFiles716 )717 fileType = 'dir'718 path = path + '/'719 else:720 fileType = self.getExtension(path, obj)721 if fileType in self.imageExtensions:722 preview = '{0}/++{1}++{2}/{3}'.format(723 siteUrl,724 self.resourceType,725 resourceName,726 path727 )728 elif fileType in self.extensionsWithIcons:729 preview = '{0}/{1}/images/fileicons/{2}.png'.format(730 siteUrl,731 self.staticFiles,732 fileType733 )734 if isinstance(obj, Image):735 properties['height'] = obj.height736 properties['width'] = obj.width737 return {738 'path': self.normalizeReturnPath(path),739 'filename': filename,740 'fileType': fileType,741 'preview': preview,742 'properties': properties,743 'error': error,744 'code': errorCode,745 }746 def addFolder(self, path, name):747 """Create a new directory on the server within the given path.748 """749 if six.PY2:750 path = safe_encode(path, 'utf-8')751 name = safe_encode(name, 'utf-8')752 code = 0753 error = ''754 parentPath = self.normalizePath(path)755 parent = None756 try:757 parent = self.getObject(parentPath)758 except KeyError:759 error = translate(_(u'filemanager_invalid_parent',760 default=u'Parent folder not found.'),761 context=self.request)762 code = 1763 else:764 if not validateFilename(name):765 error = translate(_(u'filemanager_invalid_foldername',766 default=u'Invalid folder name.'),767 context=self.request)768 code = 1769 elif name in parent:770 error = translate(_(u'filemanager_error_folder_exists',771 default=u'Folder already exists.'),772 context=self.request)773 code = 1774 else:775 try:776 parent.makeDirectory(name)777 except UnicodeDecodeError:778 error = translate(779 _(780 u'filemanager_invalid_foldername',781 default=u'Invalid folder name.'782 ),783 context=self.request784 )785 code = 1786 return {787 'parent': self.normalizeReturnPath(parentPath),788 'name': name,789 'error': error,790 'code': code,791 }792 def add(self, path, newfile, replacepath=None):793 """Add the uploaded file to the specified path. Unlike794 the other methods, this method must return its JSON response wrapped in795 an HTML <textarea>, so the MIME type of the response is text/html796 instead of text/plain. The upload form in the File Manager passes the797 current path as a POST param along with the uploaded file. The response798 includes the path as well as the name used to store the file. The799 uploaded file's name should be safe to use as a path component in a800 URL, so URL-encoded at a minimum.801 """802 if six.PY2:803 path = safe_encode(path, 'utf-8')804 if six.PY2 and replacepath is not None:805 replacepath = safe_encode(replacepath, 'utf-8')806 parentPath = self.normalizePath(path)807 error = ''808 code = 0809 name = newfile.filename810 if six.PY2 and isinstance(name, six.text_type):811 name = safe_encode(name, 'utf-8')812 if replacepath:813 newPath = replacepath814 parentPath = '/'.join(replacepath.split('/')[:-1])815 else:816 newPath = '{0}/{1}'.format(parentPath, name,)817 try:818 parent = self.getObject(parentPath)819 except KeyError:820 error = translate(_(u'filemanager_invalid_parent',821 default=u'Parent folder not found.'),822 context=self.request)823 code = 1824 else:825 if name in parent and not replacepath:826 error = translate(_(u'filemanager_error_file_exists',827 default=u'File already exists.'),828 context=self.request)829 code = 1830 else:831 try:832 self.resourceDirectory.writeFile(newPath, newfile)833 except ValueError:834 error = translate(_(u'filemanager_error_file_invalid',835 default=u'Could not read file.'),836 context=self.request)837 code = 1838 return {839 'parent': self.normalizeReturnPath(parentPath),840 'path': self.normalizeReturnPath(path),841 'name': name,842 'error': error,843 'code': code,844 }845 def addNew(self, path, name):846 """Add a new empty file in the given directory847 """848 if six.PY2:849 path = safe_encode(path, 'utf-8')850 name = safe_encode(name, 'utf-8')851 error = ''852 code = 0853 parentPath = self.normalizePath(path)854 newPath = '{0}/{1}'.format(parentPath, name,)855 try:856 parent = self.getObject(parentPath)857 except KeyError:858 error = translate(_(u'filemanager_invalid_parent',859 default=u'Parent folder not found.'),860 context=self.request)861 code = 1862 else:863 if not validateFilename(name):864 error = translate(_(u'filemanager_invalid_filename',865 default=u'Invalid file name.'),866 context=self.request)867 code = 1868 elif name in parent:869 error = translate(_(u'filemanager_error_file_exists',870 default=u'File already exists.'),871 context=self.request)872 code = 1873 else:874 self.resourceDirectory.writeFile(newPath, b'')875 return {876 'parent': self.normalizeReturnPath(parentPath),877 'name': name,878 'error': error,879 'code': code,880 }881 def rename(self, path, newName):882 """Rename the item at the given path to the new name883 """884 if six.PY2:885 path = safe_encode(path, 'utf-8')886 newName = safe_encode(newName, 'utf-8')887 npath = self.normalizePath(path)888 oldPath = newPath = '/'.join(npath.split('/')[:-1])889 oldName = npath.split('/')[-1]890 code = 0891 error = ''892 try:893 parent = self.getObject(oldPath)894 except KeyError:895 error = translate(_(u'filemanager_invalid_parent',896 default=u'Parent folder not found.'),897 context=self.request)898 code = 1899 else:900 if newName != oldName:901 if newName in parent:902 error = translate(903 _(u'filemanager_error_file_exists',904 default=u'File already exists.'),905 context=self.request)906 code = 1907 else:908 parent.rename(oldName, newName)909 return {910 'oldParent': self.normalizeReturnPath(oldPath),911 'oldName': oldName,912 'newParent': self.normalizeReturnPath(newPath),913 'newName': newName,914 'error': error,915 'code': code,916 }917 def delete(self, path):918 """Delete the item at the given path.919 """920 if six.PY2:921 path = safe_encode(path, 'utf-8')922 npath = self.normalizePath(path)923 parentPath = '/'.join(npath.split('/')[:-1])924 name = npath.split('/')[-1]925 code = 0926 error = ''927 try:928 parent = self.getObject(parentPath)929 except KeyError:930 error = translate(_(u'filemanager_invalid_parent',931 default=u'Parent folder not found.'),932 context=self.request)933 code = 1934 else:935 try:936 del parent[name]937 except KeyError:938 error = translate(_(u'filemanager_error_file_not_found',939 default=u'File not found.'),940 context=self.request)941 code = 1942 return {943 'path': self.normalizeReturnPath(path),944 'error': error,945 'code': code,946 }947 def move(self, path, directory):948 """Move the item at the given path to a new directory949 """950 if six.PY2:951 path = safe_encode(path, 'utf-8')952 directory = safe_encode(directory, 'utf-8')953 npath = self.normalizePath(path)954 newParentPath = self.normalizePath(directory)955 parentPath = self.parentPath(npath)956 filename = npath.split('/')[-1]957 code = 0958 error = ''959 newCanonicalPath = '{0}/{1}'.format(newParentPath, filename)960 try:961 parent = self.getObject(parentPath)962 except KeyError:963 error = translate(_(u'filemanager_invalid_parent',964 default=u'Parent folder not found.'),965 context=self.request)966 code = 1967 return {968 'code': code,969 'error': error,970 'newPath': self.normalizeReturnPath(newCanonicalPath),971 }972 try:973 target = self.getObject(newParentPath)974 except KeyError:975 error = translate(_(u'filemanager_error_folder_exists',976 default=u'Destination folder not found.'),977 context=self.request)978 code = 1979 return {980 'code': code,981 'error': error,982 'newPath': self.normalizeReturnPath(newCanonicalPath),983 }984 if filename not in parent:985 error = translate(_(u'filemanager_error_file_not_found',986 default=u'File not found.'),987 context=self.request)988 code = 1989 elif filename in target:990 error = translate(_(u'filemanager_error_file_exists',991 default=u'File already exists.'),992 context=self.request)993 code = 1994 else:995 obj = parent[filename]996 del parent[filename]997 target[filename] = obj998 return {999 'code': code,1000 'error': error,1001 'newPath': self.normalizeReturnPath(newCanonicalPath),1002 }1003 def download(self, path):1004 """Serve the requested file to the user1005 """1006 if six.PY2:1007 path = safe_encode(path, 'utf-8')1008 npath = self.normalizePath(path)1009 parentPath = '/'.join(npath.split('/')[:-1])1010 name = npath.split('/')[-1]1011 parent = self.getObject(parentPath)1012 self.request.response.setHeader('Content-Type',1013 'application/octet-stream')1014 self.request.response.setHeader(1015 'Content-Disposition',1016 'attachment; filename="{0}"'.format(name)1017 )1018 # TODO: Use streams here if we can1019 return parent.readFile(name)1020 # Helpers1021 def getObject(self, path):1022 path = self.normalizePath(path)1023 if not path:1024 return self.resourceDirectory1025 try:1026 return self.resourceDirectory[path]1027 except (KeyError, NotFound,):1028 raise KeyError(path)1029 def getExtension(self, path, obj):1030 basename, ext = os.path.splitext(path)1031 ext = ext[1:].lower()1032 ct = obj.getContentType()1033 if ct:1034 # take content type of the file over extension if available1035 if '/' in ct:1036 _ext = ct.split('/')[1].lower()1037 if _ext in self.extensionsWithIcons:1038 return _ext1039 return ext1040 # Methods that are their own views1041 def getFile(self, path):1042 self.setup()1043 if six.PY2:1044 path = safe_encode(path, 'utf-8')1045 path = self.normalizePath(path)1046 file = self.context.context.unrestrictedTraverse(path)1047 ext = self.getExtension(path, file)1048 result = {'ext': ext}1049 if ext not in self.imageExtensions:1050 result['contents'] = str(file.data)1051 else:1052 info = self.getInfo(path)1053 result['info'] = self.previewTemplate(info=info)1054 self.request.response.setHeader('Content-Type', 'application/json')1055 return json.dumps(result)1056 def saveFile(self, path, value):1057 path = self.request.form.get('path', path)1058 value = self.request.form.get('value', value)1059 if six.PY2:1060 path = safe_encode(path, 'utf-8')1061 value = safe_encode(value, 'utf-8')1062 path = path.lstrip('/')1063 value = value.replace('\r\n', '\n')1064 self.context.writeFile(path, value)1065 return ' ' # Zope no likey empty responses1066 def filetree(self):1067 foldersOnly = bool(self.request.get('foldersOnly', False))1068 def getFolder(root, relpath=''):1069 result = []1070 for name in root.listDirectory():1071 path = '{0}/{1}'.format(relpath, name)1072 if IResourceDirectory.providedBy(root[name]):1073 item = {1074 'title': name,1075 'key': path,...

Full Screen

Full Screen

provenance.py

Source:provenance.py Github

copy

Full Screen

...98 out = '%.10f'.format(object)99 else:100 out = object101 return out102def safe_encode(x, as_literal=True):103 """104 Encodes a python value for prov105 """106 if x is None:107 value = "Unknown"108 if as_literal:109 return pm.Literal(value, pm.XSD['string'])110 else:111 return value112 if isinstance(x, (str, bytes)):113 if isinstance(x, bytes):114 x = str(x, 'utf-8')115 if os.path.exists(x):116 if x[0] != os.pathsep:117 x = os.path.abspath(x)118 value = 'file://{}{}'.format(platform.node().lower(), x)119 if not as_literal:120 return value121 try:122 return pm.URIRef(value)123 except AttributeError:124 return pm.Literal(value, pm.XSD['anyURI'])125 else:126 value = x127 if len(x) > max_text_len:128 cliptxt = '...Clipped...'129 value = x[:max_text_len - len(cliptxt)] + cliptxt130 if not as_literal:131 return value132 return pm.Literal(value, pm.XSD['string'])133 if isinstance(x, int):134 if not as_literal:135 return x136 return pm.Literal(int(x), pm.XSD['integer'])137 if isinstance(x, float):138 if not as_literal:139 return x140 return pm.Literal(x, pm.XSD['float'])141 if isinstance(x, dict):142 outdict = {}143 for key, value in list(x.items()):144 encoded_value = safe_encode(value, as_literal=False)145 if isinstance(encoded_value, pm.Literal):146 outdict[key] = encoded_value.json_representation()147 else:148 outdict[key] = encoded_value149 try:150 jsonstr = json.dumps(outdict)151 except UnicodeDecodeError as excp:152 jsonstr = "Could not encode dictionary. {}".format(excp)153 iflogger.warn('Prov: %s', jsonstr)154 if not as_literal:155 return jsonstr156 return pm.Literal(jsonstr, pm.XSD['string'])157 if isinstance(x, (list, tuple)):158 x = list(x)159 is_object = False160 try:161 nptype = np.array(x).dtype162 is_object = nptype == np.dtype(object)163 except ValueError:164 is_object = True165 # If the array contains an heterogeneous mixture of data types166 # they should be encoded sequentially167 if is_object:168 outlist = []169 for value in x:170 encoded_value = safe_encode(value, as_literal=False)171 if isinstance(encoded_value, pm.Literal):172 outlist.append(encoded_value.json_representation())173 else:174 outlist.append(encoded_value)175 x = outlist176 try:177 jsonstr = json.dumps(x)178 except UnicodeDecodeError as excp:179 jsonstr = "Could not encode list/tuple. {}".format(excp)180 iflogger.warn('Prov: %s', jsonstr)181 if not as_literal:182 return jsonstr183 return pm.Literal(jsonstr, pm.XSD['string'])184 # If is a literal, and as_literal do nothing.185 # else bring back to json.186 if isinstance(x, pm.Literal):187 if as_literal:188 return x189 return dumps(x.json_representation())190 jsonstr = None191 ltype = pm.XSD['string']192 try:193 jsonstr = json.dumps(x.__dict__)194 except AttributeError:195 pass196 if jsonstr is None:197 try:198 jsonstr = dumps(x)199 ltype = nipype_ns['pickle']200 except TypeError as excp:201 jsonstr = 'Could not encode object. {}'.format(excp)202 if not as_literal:203 return jsonstr204 return pm.Literal(jsonstr, ltype)205def prov_encode(graph, value, create_container=True):206 if isinstance(value, (list, tuple)) and create_container:207 value = list(value)208 if len(value) == 0:209 encoded_literal = safe_encode(value)210 attr = {pm.PROV['value']: encoded_literal}211 eid = get_attr_id(attr)212 return graph.entity(eid, attr)213 if len(value) == 1:214 return prov_encode(graph, value[0])215 entities = []216 for item in value:217 item_entity = prov_encode(graph, item)218 entities.append(item_entity)219 if isinstance(item, (list, tuple)):220 continue221 item_entity_val = list(item_entity.value)[0]222 is_str = isinstance(item_entity_val, str)223 if not is_str or (is_str and 'file://' not in item_entity_val):224 return prov_encode(graph, value, create_container=False)225 eid = get_id()226 entity = graph.collection(identifier=eid)227 for item_entity in entities:228 graph.hadMember(eid, item_entity)229 return entity230 else:231 encoded_literal = safe_encode(value)232 attr = {pm.PROV['value']: encoded_literal}233 if isinstance(value, str) and os.path.exists(value):234 attr.update({pm.PROV['location']: encoded_literal})235 if not os.path.isdir(value):236 sha512 = hash_infile(value, crypto=hashlib.sha512)237 attr.update({crypto['sha512']: pm.Literal(sha512,238 pm.XSD['string'])})239 eid = get_attr_id(attr, skip=[pm.PROV['location'],240 pm.PROV['value']])241 else:242 eid = get_attr_id(attr, skip=[pm.PROV['location']])243 else:244 eid = get_attr_id(attr)245 entity = graph.entity(eid, attr)246 return entity247def write_provenance(results, filename='provenance', format='all'):248 ps = ProvStore()249 ps.add_results(results)250 return ps.write_provenance(filename=filename, format=format)251class ProvStore(object):252 def __init__(self):253 self.g = pm.ProvDocument()254 self.g.add_namespace(foaf)255 self.g.add_namespace(dcterms)256 self.g.add_namespace(nipype_ns)257 self.g.add_namespace(niiri)258 def add_results(self, results, keep_provenance=False):259 if keep_provenance and results.provenance:260 self.g = deepcopy(results.provenance)261 return self.g262 runtime = results.runtime263 interface = results.interface264 inputs = results.inputs265 outputs = results.outputs266 classname = interface.__name__267 modulepath = "{0}.{1}".format(interface.__module__, interface.__name__)268 activitytype = ''.join([i.capitalize() for i in modulepath.split('.')])269 a0_attrs = {nipype_ns['module']: interface.__module__,270 nipype_ns["interface"]: classname,271 pm.PROV["type"]: nipype_ns[activitytype],272 pm.PROV["label"]: classname,273 nipype_ns['duration']: safe_encode(runtime.duration),274 nipype_ns['workingDirectory']: safe_encode(runtime.cwd),275 nipype_ns['returnCode']: safe_encode(runtime.returncode),276 nipype_ns['platform']: safe_encode(runtime.platform),277 nipype_ns['version']: safe_encode(runtime.version),278 }279 a0_attrs[foaf["host"]] = pm.Literal(runtime.hostname,280 pm.XSD['anyURI'])281 try:282 a0_attrs.update({nipype_ns['command']: safe_encode(runtime.cmdline)})283 a0_attrs.update({nipype_ns['commandPath']:284 safe_encode(runtime.command_path)})285 a0_attrs.update({nipype_ns['dependencies']:286 safe_encode(runtime.dependencies)})287 except AttributeError:288 pass289 a0 = self.g.activity(get_id(), runtime.startTime, runtime.endTime,290 a0_attrs)291 # environment292 id = get_id()293 env_collection = self.g.collection(id)294 env_collection.add_attributes({pm.PROV['type']:295 nipype_ns['Environment'],296 pm.PROV['label']: "Environment"})297 self.g.used(a0, id)298 # write environment entities299 for idx, (key, val) in enumerate(sorted(runtime.environ.items())):300 if key not in PROV_ENVVARS:301 continue302 in_attr = {pm.PROV["label"]: key,303 nipype_ns["environmentVariable"]: key,304 pm.PROV["value"]: safe_encode(val)}305 id = get_attr_id(in_attr)306 self.g.entity(id, in_attr)307 self.g.hadMember(env_collection, id)308 # write input entities309 if inputs:310 id = get_id()311 input_collection = self.g.collection(id)312 input_collection.add_attributes({pm.PROV['type']:313 nipype_ns['Inputs'],314 pm.PROV['label']: "Inputs"})315 # write input entities316 for idx, (key, val) in enumerate(sorted(inputs.items())):317 in_entity = prov_encode(self.g, val).identifier318 self.g.hadMember(input_collection, in_entity)319 used_attr = {pm.PROV["label"]: key,320 nipype_ns["inPort"]: key}321 self.g.used(activity=a0, entity=in_entity,322 other_attributes=used_attr)323 # write output entities324 if outputs:325 id = get_id()326 output_collection = self.g.collection(id)327 if not isinstance(outputs, dict):328 outputs = outputs.get_traitsfree()329 output_collection.add_attributes({pm.PROV['type']:330 nipype_ns['Outputs'],331 pm.PROV['label']:332 "Outputs"})333 self.g.wasGeneratedBy(output_collection, a0)334 # write output entities335 for idx, (key, val) in enumerate(sorted(outputs.items())):336 out_entity = prov_encode(self.g, val).identifier337 self.g.hadMember(output_collection, out_entity)338 gen_attr = {pm.PROV["label"]: key,339 nipype_ns["outPort"]: key}340 self.g.generation(out_entity, activity=a0,341 other_attributes=gen_attr)342 # write runtime entities343 id = get_id()344 runtime_collection = self.g.collection(id)345 runtime_collection.add_attributes({pm.PROV['type']:346 nipype_ns['Runtime'],347 pm.PROV['label']:348 "RuntimeInfo"})349 self.g.wasGeneratedBy(runtime_collection, a0)350 for key, value in sorted(runtime.items()):351 if not value:352 continue353 if key not in ['stdout', 'stderr', 'merged']:354 continue355 attr = {pm.PROV["label"]: key,356 nipype_ns[key]: safe_encode(value)}357 id = get_id()358 self.g.entity(get_id(), attr)359 self.g.hadMember(runtime_collection, id)360 # create agents361 user_attr = {pm.PROV["type"]: pm.PROV["Person"],362 pm.PROV["label"]: getpass.getuser(),363 foaf["name"]: safe_encode(getpass.getuser())}364 user_agent = self.g.agent(get_attr_id(user_attr), user_attr)365 agent_attr = {pm.PROV["type"]: pm.PROV["SoftwareAgent"],366 pm.PROV["label"]: "Nipype",367 foaf["name"]: safe_encode("Nipype"),368 nipype_ns["version"]: __version__}369 for key, value in list(get_info().items()):370 agent_attr.update({nipype_ns[key]: safe_encode(value)})371 software_agent = self.g.agent(get_attr_id(agent_attr), agent_attr)372 self.g.wasAssociatedWith(a0, user_agent, None, None,373 {pm.PROV["hadRole"]: nipype_ns["LoggedInUser"]})374 self.g.wasAssociatedWith(a0, software_agent)375 return self.g376 def write_provenance(self, filename='provenance', format='all'):377 if format in ['provn', 'all']:378 with open(filename + '.provn', 'wt') as fp:379 fp.writelines(self.g.get_provn())380 try:381 if format in ['rdf', 'all']:382 if len(self.g.bundles) == 0:383 rdf_format = 'turtle'384 ext = '.ttl'...

Full Screen

Full Screen

messages.py

Source:messages.py Github

copy

Full Screen

...9 produced=datetime.datetime.now().date()10 self.produced = produced11 self.expired = expire_date12 def encode(self):13 res = safe_encode(self.produced)14 res += safe_encode(self.expired)15 return res16 def to_dict(self):17 d = {}18 d["produced"] = safe_encode(self.produced)19 d["expired"] = safe_encode(self.expired)20 return d21 @classmethod22 def from_dict(cls, d):23 return cls.parse(d["produced"] + d["expired"])24 @classmethod25 def parse(cls, raw):26 rawstr1 = raw.decode()[ : DATE_LENGTH]27 rawstr2 = raw.decode()[DATE_LENGTH : ]28 res1 = datetime.datetime.strptime(rawstr1, '%Y-%m-%d')29 res2 = datetime.datetime.strptime(rawstr2, '%Y-%m-%d')30 ttl = cls(res2.date(), produced=res1.date())31 return ttl32 def __eq__(self, other):33 ch1 = (self.produced == other.produced)34 ch2 = (self.expired == other.expired)35 return ch1 and ch236 def __str__(self):37 return 'TTL(produced='+str(self.produced)+', expired='+str(self.expired) +')'38class Request:39 def __init__(self, SrcID, UID, scope, ttl, 40 key_pair = None, sig = None):41 self.srcid = SrcID42 self.uid = UID43 self.scope = scope44 self.ttl = ttl45 if sig:46 self.sig = sig47 else:48 content = encode_id(self.srcid)49 content += encode_id(self.uid)50 content += safe_encode(self.scope)51 content += safe_encode(self.ttl)52 self.sig = key_pair.sign(content)53 def content(self):54 res = encode_id(self.srcid)55 res += encode_id(self.uid)56 res += safe_encode(self.scope)57 res += safe_encode(self.ttl)58 return res59 def encode(self):60 res = self.content()61 res += self.sig62 return res 63 def to_dict(self):64 d = {}65 d["srcid"] = self.srcid66 d["uid"] = self.uid67 d["scope"] = self.scope68 d["ttl"] = self.ttl.to_dict()69 d["sig"] = self.sig70 return d71 @classmethod72 def from_dict(cls, d):73 srcid = d["srcid"]74 uid = d["uid"]75 scope = d["scope"]76 ttl = TTL.from_dict(d["ttl"])77 sig = d["sig"]78 return cls(srcid, uid, scope, ttl, sig=sig)79 @classmethod80 def parse(cls, raw):81 sig = raw[-SIG_LENGTH : ]82 raw = cut_signature(raw)83 SrcID = parse_number(raw[: ID_LENGTH])84 UID = parse_number(raw[ID_LENGTH : 2*ID_LENGTH])85 ttl = TTL.parse(raw[-TTL_LENGTH : ])86 scope = parse_str(raw[2*ID_LENGTH : -TTL_LENGTH])87 req = cls(SrcID, UID, scope, ttl, sig=sig)88 return req89 def __eq__(self, other):90 ch1 = self.srcid == other.srcid91 ch2 = self.uid == other.uid92 ch3 = self.scope == other.scope93 ch4 = self.ttl == other.ttl94 return ch1 and ch2 and ch3 and ch495 def __str__(self):96 res = 'Request:\n'97 res += ('Service ID: ' + str(self.srcid) + '\n')98 res += ('User ID: ' + str(self.uid) + '\n')99 res += ('Scope: ' + self.scope + '\n')100 res += str(self.ttl)101 return res102 def __hash__(self):103 return hash(self.encode())104class Blob:105 def __init__(self, pub_ephem, UID, reply, 106 key_pair = None, sig = None):107 self.pub = safe_encode(pub_ephem)108 self.uid = UID109 self.reply = reply110 if sig:111 self.sig = sig112 else:113 content = self.pub114 content += encode_id(self.uid)115 content += safe_encode(self.reply)116 self.sig = key_pair.sign(content)117 def content(self):118 res = self.pub119 res += encode_id(self.uid)120 res += safe_encode(self.reply)121 return res122 def encode(self):123 return self.content() + self.sig124 def to_dict(self):125 d = {}126 d["pub"] = self.pub127 d["uid"] = self.uid128 d["reply"] = self.reply129 d["sig"] = self.sig130 return d131 @classmethod132 def from_dict(cls, d):133 pub = d["pub"]134 uid = d["uid"]135 reply = d["reply"]136 sig = d["sig"]137 return cls(pub, uid, reply, sig=sig)138 @classmethod139 def parse(cls, raw):140 pub_ephem = raw[: SIG_LENGTH]141 UID = parse_number(raw[SIG_LENGTH : SIG_LENGTH + ID_LENGTH])142 reply = raw[SIG_LENGTH + ID_LENGTH : ]143 sig = reply[-SIG_LENGTH : ]144 reply = cut_signature(reply)145 blob = cls(pub_ephem, UID, reply, sig=sig)146 return blob147 def __str__(self):148 res = "Blob: " + '\n'149 res += "User ID: " + str(self.uid) + '\n'150 res += "Reply (binary): " + str(self.reply) + '\n'151 return res152 def __eq__(self, other):153 ch1 = self.pub == other.pub154 ch2 = self.uid == other.uid155 ch3 = self.reply == other.reply156 return ch1 and ch2 and ch3157class ReplyContent:158 def __init__(self, req, secdata, salt=None):159 self.request = req160 self.secdata = secdata161 if salt is None:162 salt = rand_bytes(32)163 self.salt = salt164 def request_len(self):165 length = len(safe_encode(self.request))166 raw_len = (length).to_bytes(REQUEST_MAXLEN, 'big')167 return raw_len168 def encode(self):169 res = self.request_len()170 res += safe_encode(self.request)171 res += safe_encode(self.secdata)172 res += safe_encode(self.salt)173 return res174 def encrypt(self, key, iv=None):175 if iv is None:176 iv = rand_bytes(16)177 data = self.encode()178 cipher = crypto.Grasshopper(key)179 cbc = crypto.CBC(cipher)180 cbc.set_iv(iv)181 reply = cbc.encrypt(data)182 reply = iv + reply183 return reply184 @classmethod185 def parse(cls, raw):186 req_len = parse_number(raw[:REQUEST_MAXLEN])187 raw = raw[REQUEST_MAXLEN : ]188 req = Request.parse(raw[ : req_len])189 secdata = parse_str(raw[req_len : -SALT_LENGTH])190 salt = raw[ -SALT_LENGTH : ]191 return cls(req, secdata, salt=salt)192 def __eq__(self, other):193 ch1 = self.request == other.request194 ch2 = self.secdata == other.secdata195 return ch1 and ch2196class Response:197 def __init__(self, ID, blob, ttl, answer, 198 key_pair = None, sig = None):199 self.iid = ID200 self.blob = blob201 self.ttl = ttl202 self.answer = answer203 if sig:204 self.sig = sig205 else:206 content = encode_id(self.iid)207 content += safe_encode(self.blob)208 content += safe_encode(self.ttl)209 content += safe_encode(self.answer)210 self.sig = key_pair.sign(content)211 def content(self):212 res = encode_id(self.iid)213 res += safe_encode(self.blob)214 res += safe_encode(self.ttl)215 res += safe_encode(self.answer)216 return res217 def encode(self):218 return self.content() + self.sig219 def to_dict(self):220 d = {}221 d["iid"] = self.iid222 d["blob"] = self.blob.to_dict()223 d["ttl"] = self.ttl.to_dict()224 d["ans"] = self.answer225 d["sig"] = self.sig226 return d227 @classmethod228 def from_dict(cls, d):229 iid = d["iid"]...

Full Screen

Full Screen

tests_encodeutils.py

Source:tests_encodeutils.py Github

copy

Full Screen

...49 def test_safe_encode_force_incoming_utf8_to_ascii(self):50 # Forcing incoming to ascii so it falls back to utf-851 self.assertEqual(52 'ni\xc3\xb1o'.encode("latin-1"),53 encodeutils.safe_encode('ni\xc3\xb1o'.encode("latin-1"),54 incoming='ascii'),55 )56 def test_safe_encode_same_encoding_different_cases(self):57 with mock.patch.object(encodeutils, 'safe_decode', mock.Mock()):58 utf8 = encodeutils.safe_encode(59 'foo\xf1bar', encoding='utf-8')60 self.assertEqual(61 encodeutils.safe_encode(utf8, 'UTF-8', 'utf-8'),62 encodeutils.safe_encode(utf8, 'utf-8', 'UTF-8'),63 )64 self.assertEqual(65 encodeutils.safe_encode(utf8, 'UTF-8', 'utf-8'),66 encodeutils.safe_encode(utf8, 'utf-8', 'utf-8'),67 )68 encodeutils.safe_decode.assert_has_calls([])69 def test_safe_encode_different_encodings(self):70 text = 'foo\xc3\xb1bar'71 result = encodeutils.safe_encode(72 text=text, incoming='utf-8', encoding='iso-8859-1')73 self.assertNotEqual(text, result)74 self.assertNotEqual("foo\xf1bar".encode("latin-1"), result)75 def test_to_utf8(self):76 self.assertEqual(encodeutils.to_utf8(b'a\xe9\xff'), # bytes77 b'a\xe9\xff')78 self.assertEqual(encodeutils.to_utf8(u'a\xe9\xff\u20ac'), # Unicode79 b'a\xc3\xa9\xc3\xbf\xe2\x82\xac')80 self.assertRaises(TypeError, encodeutils.to_utf8, 123) # invalid81 # oslo.i18n Message objects should also be accepted for convenience.82 # It works because Message is a subclass of str. Use the83 # lazy translation to get a Message instance of oslo_i18n.84 msg = oslo_i18n_fixture.Translation().lazy("test")85 self.assertEqual(encodeutils.to_utf8(msg),...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Nose2 automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful