Best Python code snippet using gabbi_python
hadoopfilesystem.py
Source:hadoopfilesystem.py  
...111        'http://%s:%s' % (hdfs_host, str(hdfs_port)), user=hdfs_user)112  @classmethod113  def scheme(cls):114    return 'hdfs'115  def _parse_url(self, url):116    """Verifies that url begins with hdfs:// prefix, strips it and adds a117    leading /.118    Parsing behavior is determined by HadoopFileSystemOptions.hdfs_full_urls.119    Args:120      url: (str) A URL in the form hdfs://path/...121        or in the form hdfs://server/path/...122    Raises:123      ValueError if the URL doesn't match the expect format.124    Returns:125      (str, str) If using hdfs_full_urls, for an input of126      'hdfs://server/path/...' will return (server, '/path/...').127      Otherwise, for an input of 'hdfs://path/...', will return128      ('', '/path/...').129    """130    if not self._full_urls:131      m = _URL_RE.match(url)132      if m is None:133        raise ValueError('Could not parse url: %s' % url)134      return '', m.group(1)135    else:136      m = _FULL_URL_RE.match(url)137      if m is None:138        raise ValueError('Could not parse url: %s' % url)139      return m.group(1), m.group(2) or '/'140  def join(self, base_url, *paths):141    """Join two or more pathname components.142    Args:143      base_url: string path of the first component of the path.144        Must start with hdfs://.145      paths: path components to be added146    Returns:147      Full url after combining all the passed components.148    """149    server, basepath = self._parse_url(base_url)150    return _HDFS_PREFIX + self._join(server, basepath, *paths)151  def _join(self, server, basepath, *paths):152    res = posixpath.join(basepath, *paths)153    if server:154      server = '/' + server155    return server + res156  def split(self, url):157    server, rel_path = self._parse_url(url)158    if server:159      server = '/' + server160    head, tail = posixpath.split(rel_path)161    return _HDFS_PREFIX + server + head, tail162  def mkdirs(self, url):163    _, path = self._parse_url(url)164    if self._exists(path):165      raise BeamIOError('Path already exists: %s' % path)166    return self._mkdirs(path)167  def _mkdirs(self, path):168    self._hdfs_client.makedirs(path)169  def has_dirs(self):170    return True171  def _list(self, url):172    try:173      server, path = self._parse_url(url)174      for res in self._hdfs_client.list(path, status=True):175        yield FileMetadata(176            _HDFS_PREFIX + self._join(server, path, res[0]),177            res[1][_FILE_STATUS_LENGTH])178    except Exception as e:  # pylint: disable=broad-except179      raise BeamIOError('List operation failed', {url: e})180  @staticmethod181  def _add_compression(stream, path, mime_type, compression_type):182    if mime_type != 'application/octet-stream':183      _LOGGER.warning(184          'Mime types are not supported. Got non-default mime_type:'185          ' %s',186          mime_type)187    if compression_type == CompressionTypes.AUTO:188      compression_type = CompressionTypes.detect_compression_type(path)189    if compression_type != CompressionTypes.UNCOMPRESSED:190      return CompressedFile(stream)191    return stream192  def create(193      self,194      url,195      mime_type='application/octet-stream',196      compression_type=CompressionTypes.AUTO):197    # type: (...) -> BinaryIO198    """199    Returns:200      A Python File-like object.201    """202    _, path = self._parse_url(url)203    return self._create(path, mime_type, compression_type)204  def _create(205      self,206      path,207      mime_type='application/octet-stream',208      compression_type=CompressionTypes.AUTO):209    stream = io.BufferedWriter(210        filesystemio.UploaderStream(HdfsUploader(self._hdfs_client, path)),211        buffer_size=_DEFAULT_BUFFER_SIZE)212    return self._add_compression(stream, path, mime_type, compression_type)213  def open(214      self,215      url,216      mime_type='application/octet-stream',217      compression_type=CompressionTypes.AUTO):218    # type: (...) -> BinaryIO219    """220    Returns:221      A Python File-like object.222    """223    _, path = self._parse_url(url)224    return self._open(path, mime_type, compression_type)225  def _open(226      self,227      path,228      mime_type='application/octet-stream',229      compression_type=CompressionTypes.AUTO):230    stream = io.BufferedReader(231        filesystemio.DownloaderStream(HdfsDownloader(self._hdfs_client, path)),232        buffer_size=_DEFAULT_BUFFER_SIZE)233    return self._add_compression(stream, path, mime_type, compression_type)234  def copy(self, source_file_names, destination_file_names):235    """236    It is an error if any file to copy already exists at the destination.237    Raises ``BeamIOError`` if any error occurred.238    Args:239      source_file_names: iterable of URLs.240      destination_file_names: iterable of URLs.241    """242    if len(source_file_names) != len(destination_file_names):243      raise BeamIOError(244          'source_file_names and destination_file_names should '245          'be equal in length: %d != %d' %246          (len(source_file_names), len(destination_file_names)))247    def _copy_file(source, destination):248      with self._open(source) as f1:249        with self._create(destination) as f2:250          while True:251            buf = f1.read(_COPY_BUFFER_SIZE)252            if not buf:253              break254            f2.write(buf)255    def _copy_path(source, destination):256      """Recursively copy the file tree from the source to the destination."""257      if self._hdfs_client.status(258          source)[_FILE_STATUS_TYPE] != _FILE_STATUS_TYPE_DIRECTORY:259        _copy_file(source, destination)260        return261      for path, dirs, files in self._hdfs_client.walk(source):262        for dir in dirs:263          new_dir = self._join('', destination, dir)264          if not self._exists(new_dir):265            self._mkdirs(new_dir)266        rel_path = posixpath.relpath(path, source)267        if rel_path == '.':268          rel_path = ''269        for file in files:270          _copy_file(271              self._join('', path, file),272              self._join('', destination, rel_path, file))273    exceptions = {}274    for source, destination in zip(source_file_names, destination_file_names):275      try:276        _, rel_source = self._parse_url(source)277        _, rel_destination = self._parse_url(destination)278        _copy_path(rel_source, rel_destination)279      except Exception as e:  # pylint: disable=broad-except280        exceptions[(source, destination)] = e281    if exceptions:282      raise BeamIOError('Copy operation failed', exceptions)283  def rename(self, source_file_names, destination_file_names):284    exceptions = {}285    for source, destination in zip(source_file_names, destination_file_names):286      try:287        _, rel_source = self._parse_url(source)288        _, rel_destination = self._parse_url(destination)289        try:290          self._hdfs_client.rename(rel_source, rel_destination)291        except hdfs.HdfsError as e:292          raise BeamIOError(293              'libhdfs error in renaming %s to %s' % (source, destination), e)294      except Exception as e:  # pylint: disable=broad-except295        exceptions[(source, destination)] = e296    if exceptions:297      raise BeamIOError('Rename operation failed', exceptions)298  def exists(self, url):299    # type: (str) -> bool300    """Checks existence of url in HDFS.301    Args:302      url: String in the form hdfs://...303    Returns:304      True if url exists as a file or directory in HDFS.305    """306    _, path = self._parse_url(url)307    return self._exists(path)308  def _exists(self, path):309    """Returns True if path exists as a file or directory in HDFS.310    Args:311      path: String in the form /...312    """313    return self._hdfs_client.status(path, strict=False) is not None314  def size(self, url):315    _, path = self._parse_url(url)316    status = self._hdfs_client.status(path, strict=False)317    if status is None:318      raise BeamIOError('File not found: %s' % url)319    return status[_FILE_STATUS_LENGTH]320  def last_updated(self, url):321    raise NotImplementedError322  def checksum(self, url):323    """Fetches a checksum description for a URL.324    Returns:325      String describing the checksum.326    """327    _, path = self._parse_url(url)328    file_checksum = self._hdfs_client.checksum(path)329    return '%s-%d-%s' % (330        file_checksum[_FILE_CHECKSUM_ALGORITHM],331        file_checksum[_FILE_CHECKSUM_LENGTH],332        file_checksum[_FILE_CHECKSUM_BYTES],333    )334  def delete(self, urls):335    exceptions = {}336    for url in urls:337      try:338        _, path = self._parse_url(url)339        self._hdfs_client.delete(path, recursive=True)340      except Exception as e:  # pylint: disable=broad-except341        exceptions[url] = e342    if exceptions:...test_parse_url.py
Source:test_parse_url.py  
...31        http_case.prefix = prefix32        http_case.test_data['ssl'] = ssl33        http_case.test_data['query_parameters'] = params or {}34        return http_case35    def test_parse_url(self):36        host = uuid.uuid4()37        http_case = self.make_test_case(host)38        parsed_url = http_case._parse_url('/foobar')39        self.assertEqual('http://%s:8000/foobar' % host, parsed_url)40    def test_parse_prefix(self):41        host = uuid.uuid4()42        http_case = self.make_test_case(host, prefix='/noise')43        parsed_url = http_case._parse_url('/foobar')44        self.assertEqual('http://%s:8000/noise/foobar' % host, parsed_url)45    def test_parse_full(self):46        host = uuid.uuid4()47        http_case = self.make_test_case(host)48        parsed_url = http_case._parse_url('http://example.com/house')49        self.assertEqual('http://example.com/house', parsed_url)50    def test_with_ssl(self):51        host = uuid.uuid4().hex52        http_case = self.make_test_case(host, ssl=True)53        parsed_url = http_case._parse_url('/foobar')54        self.assertEqual('https://%s:8000/foobar' % host, parsed_url)55    def test_default_port_http(self):56        host = uuid.uuid4().hex57        http_case = self.make_test_case(host, port='80')58        parsed_url = http_case._parse_url('/foobar')59        self.assertEqual('http://%s/foobar' % host, parsed_url)60    def test_default_port_int(self):61        host = uuid.uuid4().hex62        http_case = self.make_test_case(host, port=80)63        parsed_url = http_case._parse_url('/foobar')64        self.assertEqual('http://%s/foobar' % host, parsed_url)65    def test_default_port_https(self):66        host = uuid.uuid4().hex67        http_case = self.make_test_case(host, port='443', ssl=True)68        parsed_url = http_case._parse_url('/foobar')69        self.assertEqual('https://%s/foobar' % host, parsed_url)70    def test_default_port_https_no_ssl(self):71        host = uuid.uuid4().hex72        http_case = self.make_test_case(host, port='443')73        parsed_url = http_case._parse_url('/foobar')74        self.assertEqual('http://%s:443/foobar' % host, parsed_url)75    def test_https_port_80_ssl(self):76        host = uuid.uuid4().hex77        http_case = self.make_test_case(host, port='80', ssl=True)78        parsed_url = http_case._parse_url('/foobar')79        self.assertEqual('https://%s:80/foobar' % host, parsed_url)80    def test_add_query_params(self):81        host = uuid.uuid4().hex82        # Use a sequence of tuples to ensure order.83        query = OrderedDict([('x', 1), ('y', 2)])84        http_case = self.make_test_case(host, params=query)85        parsed_url = http_case._parse_url('/foobar')86        self.assertEqual('http://%s:8000/foobar?x=1&y=2' % host, parsed_url)87    def test_extend_query_params(self):88        host = uuid.uuid4().hex89        # Use a sequence of tuples to ensure order.90        query = OrderedDict([('x', 1), ('y', 2)])91        http_case = self.make_test_case(host, params=query)92        parsed_url = http_case._parse_url('/foobar?alpha=beta')93        self.assertEqual('http://%s:8000/foobar?alpha=beta&x=1&y=2'94                         % host, parsed_url)95    def test_extend_query_params_full_ulr(self):96        host = 'stub'97        query = OrderedDict([('x', 1), ('y', 2)])98        http_case = self.make_test_case(host, params=query)99        parsed_url = http_case._parse_url(100            'http://example.com/foobar?alpha=beta')101        self.assertEqual('http://example.com/foobar?alpha=beta&x=1&y=2',...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
