How to use _parse_url method in gabbi

Best Python code snippet using gabbi_python

hadoopfilesystem.py

Source:hadoopfilesystem.py Github

copy

Full Screen

...111 'http://%s:%s' % (hdfs_host, str(hdfs_port)), user=hdfs_user)112 @classmethod113 def scheme(cls):114 return 'hdfs'115 def _parse_url(self, url):116 """Verifies that url begins with hdfs:// prefix, strips it and adds a117 leading /.118 Parsing behavior is determined by HadoopFileSystemOptions.hdfs_full_urls.119 Args:120 url: (str) A URL in the form hdfs://path/...121 or in the form hdfs://server/path/...122 Raises:123 ValueError if the URL doesn't match the expect format.124 Returns:125 (str, str) If using hdfs_full_urls, for an input of126 'hdfs://server/path/...' will return (server, '/path/...').127 Otherwise, for an input of 'hdfs://path/...', will return128 ('', '/path/...').129 """130 if not self._full_urls:131 m = _URL_RE.match(url)132 if m is None:133 raise ValueError('Could not parse url: %s' % url)134 return '', m.group(1)135 else:136 m = _FULL_URL_RE.match(url)137 if m is None:138 raise ValueError('Could not parse url: %s' % url)139 return m.group(1), m.group(2) or '/'140 def join(self, base_url, *paths):141 """Join two or more pathname components.142 Args:143 base_url: string path of the first component of the path.144 Must start with hdfs://.145 paths: path components to be added146 Returns:147 Full url after combining all the passed components.148 """149 server, basepath = self._parse_url(base_url)150 return _HDFS_PREFIX + self._join(server, basepath, *paths)151 def _join(self, server, basepath, *paths):152 res = posixpath.join(basepath, *paths)153 if server:154 server = '/' + server155 return server + res156 def split(self, url):157 server, rel_path = self._parse_url(url)158 if server:159 server = '/' + server160 head, tail = posixpath.split(rel_path)161 return _HDFS_PREFIX + server + head, tail162 def mkdirs(self, url):163 _, path = self._parse_url(url)164 if self._exists(path):165 raise BeamIOError('Path already exists: %s' % path)166 return self._mkdirs(path)167 def _mkdirs(self, path):168 self._hdfs_client.makedirs(path)169 def has_dirs(self):170 return True171 def _list(self, url):172 try:173 server, path = self._parse_url(url)174 for res in self._hdfs_client.list(path, status=True):175 yield FileMetadata(176 _HDFS_PREFIX + self._join(server, path, res[0]),177 res[1][_FILE_STATUS_LENGTH])178 except Exception as e: # pylint: disable=broad-except179 raise BeamIOError('List operation failed', {url: e})180 @staticmethod181 def _add_compression(stream, path, mime_type, compression_type):182 if mime_type != 'application/octet-stream':183 _LOGGER.warning(184 'Mime types are not supported. Got non-default mime_type:'185 ' %s',186 mime_type)187 if compression_type == CompressionTypes.AUTO:188 compression_type = CompressionTypes.detect_compression_type(path)189 if compression_type != CompressionTypes.UNCOMPRESSED:190 return CompressedFile(stream)191 return stream192 def create(193 self,194 url,195 mime_type='application/octet-stream',196 compression_type=CompressionTypes.AUTO):197 # type: (...) -> BinaryIO198 """199 Returns:200 A Python File-like object.201 """202 _, path = self._parse_url(url)203 return self._create(path, mime_type, compression_type)204 def _create(205 self,206 path,207 mime_type='application/octet-stream',208 compression_type=CompressionTypes.AUTO):209 stream = io.BufferedWriter(210 filesystemio.UploaderStream(HdfsUploader(self._hdfs_client, path)),211 buffer_size=_DEFAULT_BUFFER_SIZE)212 return self._add_compression(stream, path, mime_type, compression_type)213 def open(214 self,215 url,216 mime_type='application/octet-stream',217 compression_type=CompressionTypes.AUTO):218 # type: (...) -> BinaryIO219 """220 Returns:221 A Python File-like object.222 """223 _, path = self._parse_url(url)224 return self._open(path, mime_type, compression_type)225 def _open(226 self,227 path,228 mime_type='application/octet-stream',229 compression_type=CompressionTypes.AUTO):230 stream = io.BufferedReader(231 filesystemio.DownloaderStream(HdfsDownloader(self._hdfs_client, path)),232 buffer_size=_DEFAULT_BUFFER_SIZE)233 return self._add_compression(stream, path, mime_type, compression_type)234 def copy(self, source_file_names, destination_file_names):235 """236 It is an error if any file to copy already exists at the destination.237 Raises ``BeamIOError`` if any error occurred.238 Args:239 source_file_names: iterable of URLs.240 destination_file_names: iterable of URLs.241 """242 if len(source_file_names) != len(destination_file_names):243 raise BeamIOError(244 'source_file_names and destination_file_names should '245 'be equal in length: %d != %d' %246 (len(source_file_names), len(destination_file_names)))247 def _copy_file(source, destination):248 with self._open(source) as f1:249 with self._create(destination) as f2:250 while True:251 buf = f1.read(_COPY_BUFFER_SIZE)252 if not buf:253 break254 f2.write(buf)255 def _copy_path(source, destination):256 """Recursively copy the file tree from the source to the destination."""257 if self._hdfs_client.status(258 source)[_FILE_STATUS_TYPE] != _FILE_STATUS_TYPE_DIRECTORY:259 _copy_file(source, destination)260 return261 for path, dirs, files in self._hdfs_client.walk(source):262 for dir in dirs:263 new_dir = self._join('', destination, dir)264 if not self._exists(new_dir):265 self._mkdirs(new_dir)266 rel_path = posixpath.relpath(path, source)267 if rel_path == '.':268 rel_path = ''269 for file in files:270 _copy_file(271 self._join('', path, file),272 self._join('', destination, rel_path, file))273 exceptions = {}274 for source, destination in zip(source_file_names, destination_file_names):275 try:276 _, rel_source = self._parse_url(source)277 _, rel_destination = self._parse_url(destination)278 _copy_path(rel_source, rel_destination)279 except Exception as e: # pylint: disable=broad-except280 exceptions[(source, destination)] = e281 if exceptions:282 raise BeamIOError('Copy operation failed', exceptions)283 def rename(self, source_file_names, destination_file_names):284 exceptions = {}285 for source, destination in zip(source_file_names, destination_file_names):286 try:287 _, rel_source = self._parse_url(source)288 _, rel_destination = self._parse_url(destination)289 try:290 self._hdfs_client.rename(rel_source, rel_destination)291 except hdfs.HdfsError as e:292 raise BeamIOError(293 'libhdfs error in renaming %s to %s' % (source, destination), e)294 except Exception as e: # pylint: disable=broad-except295 exceptions[(source, destination)] = e296 if exceptions:297 raise BeamIOError('Rename operation failed', exceptions)298 def exists(self, url):299 # type: (str) -> bool300 """Checks existence of url in HDFS.301 Args:302 url: String in the form hdfs://...303 Returns:304 True if url exists as a file or directory in HDFS.305 """306 _, path = self._parse_url(url)307 return self._exists(path)308 def _exists(self, path):309 """Returns True if path exists as a file or directory in HDFS.310 Args:311 path: String in the form /...312 """313 return self._hdfs_client.status(path, strict=False) is not None314 def size(self, url):315 _, path = self._parse_url(url)316 status = self._hdfs_client.status(path, strict=False)317 if status is None:318 raise BeamIOError('File not found: %s' % url)319 return status[_FILE_STATUS_LENGTH]320 def last_updated(self, url):321 raise NotImplementedError322 def checksum(self, url):323 """Fetches a checksum description for a URL.324 Returns:325 String describing the checksum.326 """327 _, path = self._parse_url(url)328 file_checksum = self._hdfs_client.checksum(path)329 return '%s-%d-%s' % (330 file_checksum[_FILE_CHECKSUM_ALGORITHM],331 file_checksum[_FILE_CHECKSUM_LENGTH],332 file_checksum[_FILE_CHECKSUM_BYTES],333 )334 def delete(self, urls):335 exceptions = {}336 for url in urls:337 try:338 _, path = self._parse_url(url)339 self._hdfs_client.delete(path, recursive=True)340 except Exception as e: # pylint: disable=broad-except341 exceptions[url] = e342 if exceptions:...

Full Screen

Full Screen

test_parse_url.py

Source:test_parse_url.py Github

copy

Full Screen

...31 http_case.prefix = prefix32 http_case.test_data['ssl'] = ssl33 http_case.test_data['query_parameters'] = params or {}34 return http_case35 def test_parse_url(self):36 host = uuid.uuid4()37 http_case = self.make_test_case(host)38 parsed_url = http_case._parse_url('/foobar')39 self.assertEqual('http://%s:8000/foobar' % host, parsed_url)40 def test_parse_prefix(self):41 host = uuid.uuid4()42 http_case = self.make_test_case(host, prefix='/noise')43 parsed_url = http_case._parse_url('/foobar')44 self.assertEqual('http://%s:8000/noise/foobar' % host, parsed_url)45 def test_parse_full(self):46 host = uuid.uuid4()47 http_case = self.make_test_case(host)48 parsed_url = http_case._parse_url('http://example.com/house')49 self.assertEqual('http://example.com/house', parsed_url)50 def test_with_ssl(self):51 host = uuid.uuid4().hex52 http_case = self.make_test_case(host, ssl=True)53 parsed_url = http_case._parse_url('/foobar')54 self.assertEqual('https://%s:8000/foobar' % host, parsed_url)55 def test_default_port_http(self):56 host = uuid.uuid4().hex57 http_case = self.make_test_case(host, port='80')58 parsed_url = http_case._parse_url('/foobar')59 self.assertEqual('http://%s/foobar' % host, parsed_url)60 def test_default_port_int(self):61 host = uuid.uuid4().hex62 http_case = self.make_test_case(host, port=80)63 parsed_url = http_case._parse_url('/foobar')64 self.assertEqual('http://%s/foobar' % host, parsed_url)65 def test_default_port_https(self):66 host = uuid.uuid4().hex67 http_case = self.make_test_case(host, port='443', ssl=True)68 parsed_url = http_case._parse_url('/foobar')69 self.assertEqual('https://%s/foobar' % host, parsed_url)70 def test_default_port_https_no_ssl(self):71 host = uuid.uuid4().hex72 http_case = self.make_test_case(host, port='443')73 parsed_url = http_case._parse_url('/foobar')74 self.assertEqual('http://%s:443/foobar' % host, parsed_url)75 def test_https_port_80_ssl(self):76 host = uuid.uuid4().hex77 http_case = self.make_test_case(host, port='80', ssl=True)78 parsed_url = http_case._parse_url('/foobar')79 self.assertEqual('https://%s:80/foobar' % host, parsed_url)80 def test_add_query_params(self):81 host = uuid.uuid4().hex82 # Use a sequence of tuples to ensure order.83 query = OrderedDict([('x', 1), ('y', 2)])84 http_case = self.make_test_case(host, params=query)85 parsed_url = http_case._parse_url('/foobar')86 self.assertEqual('http://%s:8000/foobar?x=1&y=2' % host, parsed_url)87 def test_extend_query_params(self):88 host = uuid.uuid4().hex89 # Use a sequence of tuples to ensure order.90 query = OrderedDict([('x', 1), ('y', 2)])91 http_case = self.make_test_case(host, params=query)92 parsed_url = http_case._parse_url('/foobar?alpha=beta')93 self.assertEqual('http://%s:8000/foobar?alpha=beta&x=1&y=2'94 % host, parsed_url)95 def test_extend_query_params_full_ulr(self):96 host = 'stub'97 query = OrderedDict([('x', 1), ('y', 2)])98 http_case = self.make_test_case(host, params=query)99 parsed_url = http_case._parse_url(100 'http://example.com/foobar?alpha=beta')101 self.assertEqual('http://example.com/foobar?alpha=beta&x=1&y=2',...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run gabbi automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful