How to use is_write_request method in localstack

Best Python code snippet using localstack_python

request.py

Source:request.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2# imageio is distributed under the terms of the (new) BSD License.3"""4Definition of the Request object, which acts as a kind of bridge between5what the user wants and what the plugins can.6"""7import os8from io import BytesIO9import zipfile10import tempfile11import shutil12from ..core import urlopen, get_remote_file13try:14 from pathlib import Path15except ImportError:16 Path = None17# URI types18URI_BYTES = 119URI_FILE = 220URI_FILENAME = 321URI_ZIPPED = 422URI_HTTP = 523URI_FTP = 624SPECIAL_READ_URIS = "<video", "<screen>", "<clipboard>"25# The user can use this string in a write call to get the data back as bytes.26RETURN_BYTES = "<bytes>"27# Example images that will be auto-downloaded28EXAMPLE_IMAGES = {29 "astronaut.png": "Image of the astronaut Eileen Collins",30 "camera.png": "Classic grayscale image of a photographer",31 "checkerboard.png": "Black and white image of a chekerboard",32 "wood.jpg": "A (repeatable) texture of wooden planks",33 "bricks.jpg": "A (repeatable) texture of stone bricks",34 "clock.png": "Photo of a clock with motion blur (Stefan van der Walt)",35 "coffee.png": "Image of a cup of coffee (Rachel Michetti)",36 "chelsea.png": "Image of Stefan's cat",37 "wikkie.png": "Image of Almar's cat",38 "coins.png": "Image showing greek coins from Pompeii",39 "horse.png": "Image showing the silhouette of a horse (Andreas Preuss)",40 "hubble_deep_field.png": "Photograph taken by Hubble telescope (NASA)",41 "immunohistochemistry.png": "Immunohistochemical (IHC) staining",42 "moon.png": "Image showing a portion of the surface of the moon",43 "page.png": "A scanned page of text",44 "text.png": "A photograph of handdrawn text",45 "chelsea.zip": "The chelsea.png in a zipfile (for testing)",46 "chelsea.bsdf": "The chelsea.png in a BSDF file(for testing)",47 "newtonscradle.gif": "Animated GIF of a newton's cradle",48 "cockatoo.mp4": "Video file of a cockatoo",49 "stent.npz": "Volumetric image showing a stented abdominal aorta",50 "meadow_cube.jpg": "A cubemap image of a meadow, e.g. to render a skybox.",51}52class Request(object):53 """ Request(uri, mode, **kwargs)54 Represents a request for reading or saving an image resource. This55 object wraps information to that request and acts as an interface56 for the plugins to several resources; it allows the user to read57 from filenames, files, http, zipfiles, raw bytes, etc., but offer58 a simple interface to the plugins via ``get_file()`` and59 ``get_local_filename()``.60 For each read/write operation a single Request instance is used and passed61 to the can_read/can_write method of a format, and subsequently to62 the Reader/Writer class. This allows rudimentary passing of63 information between different formats and between a format and64 associated reader/writer.65 parameters66 ----------67 uri : {str, bytes, file}68 The resource to load the image from.69 mode : str70 The first character is "r" or "w", indicating a read or write71 request. The second character is used to indicate the kind of data:72 "i" for an image, "I" for multiple images, "v" for a volume,73 "V" for multiple volumes, "?" for don't care.74 """75 def __init__(self, uri, mode, **kwargs):76 # General77 self._uri_type = None78 self._filename = None79 self._extension = None80 self._kwargs = kwargs81 self._result = None # Some write actions may have a result82 # To handle the user-side83 self._filename_zip = None # not None if a zipfile is used84 self._bytes = None # Incoming bytes85 self._zipfile = None # To store a zipfile instance (if used)86 # To handle the plugin side87 self._file = None # To store the file instance88 self._file_is_local = False # whether the data needs to be copied at end89 self._filename_local = None # not None if using tempfile on this FS90 self._firstbytes = None # For easy header parsing91 # To store formats that may be able to fulfil this request92 # self._potential_formats = []93 # Check mode94 self._mode = mode95 if not isinstance(mode, str):96 raise ValueError("Request requires mode must be a string")97 if not len(mode) == 2:98 raise ValueError("Request requires mode to have two chars")99 if mode[0] not in "rw":100 raise ValueError('Request requires mode[0] to be "r" or "w"')101 if mode[1] not in "iIvV?":102 raise ValueError('Request requires mode[1] to be in "iIvV?"')103 # Parse what was given104 self._parse_uri(uri)105 # Set extension106 if self._filename is not None:107 ext = self._filename108 if self._filename.startswith(("http://", "https://", "ftp://", "ftps://")):109 ext = ext.split("?")[0]110 self._extension = "." + ext.split(".")[-1].lower()111 def _parse_uri(self, uri):112 """ Try to figure our what we were given113 """114 is_read_request = self.mode[0] == "r"115 is_write_request = self.mode[0] == "w"116 if isinstance(uri, str):117 # Explicit118 if uri.startswith("imageio:"):119 if is_write_request:120 raise RuntimeError("Cannot write to the standard images.")121 fn = uri.split(":", 1)[-1].lower()122 fn, _, zip_part = fn.partition(".zip/")123 if zip_part:124 fn += ".zip"125 if fn not in EXAMPLE_IMAGES:126 raise ValueError("Unknown standard image %r." % fn)127 self._uri_type = URI_FILENAME128 self._filename = get_remote_file("images/" + fn, auto=True)129 if zip_part:130 self._filename += "/" + zip_part131 elif uri.startswith("http://") or uri.startswith("https://"):132 self._uri_type = URI_HTTP133 self._filename = uri134 elif uri.startswith("ftp://") or uri.startswith("ftps://"):135 self._uri_type = URI_FTP136 self._filename = uri137 elif uri.startswith("file://"):138 self._uri_type = URI_FILENAME139 self._filename = uri[7:]140 elif uri.startswith(SPECIAL_READ_URIS) and is_read_request:141 self._uri_type = URI_BYTES142 self._filename = uri143 elif uri.startswith(RETURN_BYTES) and is_write_request:144 self._uri_type = URI_BYTES145 self._filename = uri146 else:147 self._uri_type = URI_FILENAME148 self._filename = uri149 elif isinstance(uri, memoryview) and is_read_request:150 self._uri_type = URI_BYTES151 self._filename = "<bytes>"152 self._bytes = uri.tobytes()153 elif isinstance(uri, bytes) and is_read_request:154 self._uri_type = URI_BYTES155 self._filename = "<bytes>"156 self._bytes = uri157 elif Path is not None and isinstance(uri, Path):158 self._uri_type = URI_FILENAME159 self._filename = str(uri)160 # Files161 elif is_read_request:162 if hasattr(uri, "read") and hasattr(uri, "close"):163 self._uri_type = URI_FILE164 self._filename = "<file>"165 self._file = uri # Data must be read from here166 elif is_write_request:167 if hasattr(uri, "write") and hasattr(uri, "close"):168 self._uri_type = URI_FILE169 self._filename = "<file>"170 self._file = uri # Data must be written here171 # Expand user dir172 if self._uri_type == URI_FILENAME and self._filename.startswith("~"):173 self._filename = os.path.expanduser(self._filename)174 # Check if a zipfile175 if self._uri_type == URI_FILENAME:176 # Search for zip extension followed by a path separater177 for needle in [".zip/", ".zip\\"]:178 zip_i = self._filename.lower().find(needle)179 if zip_i > 0:180 zip_i += 4181 zip_path = self._filename[:zip_i]182 if is_write_request or os.path.isfile(zip_path):183 self._uri_type = URI_ZIPPED184 self._filename_zip = (185 zip_path,186 self._filename[zip_i:].lstrip("/\\"),187 )188 break189 # Check if we could read it190 if self._uri_type is None:191 uri_r = repr(uri)192 if len(uri_r) > 60:193 uri_r = uri_r[:57] + "..."194 raise IOError("Cannot understand given URI: %s." % uri_r)195 # Check if this is supported196 noWriting = [URI_HTTP, URI_FTP]197 if is_write_request and self._uri_type in noWriting:198 raise IOError("imageio does not support writing to http/ftp.")199 # Deprecated way to load standard images, give a sensible error message200 if is_read_request and self._uri_type in [URI_FILENAME, URI_ZIPPED]:201 fn = self._filename202 if self._filename_zip:203 fn = self._filename_zip[0]204 if (not os.path.exists(fn)) and (fn in EXAMPLE_IMAGES):205 raise IOError(206 "No such file: %r. This file looks like one of "207 "the standard images, but from imageio 2.1, "208 "standard images have to be specified using "209 '"imageio:%s".' % (fn, fn)210 )211 # Make filename absolute212 if self._uri_type in [URI_FILENAME, URI_ZIPPED]:213 if self._filename_zip:214 self._filename_zip = (215 os.path.abspath(self._filename_zip[0]),216 self._filename_zip[1],217 )218 else:219 self._filename = os.path.abspath(self._filename)220 # Check whether file name is valid221 if self._uri_type in [URI_FILENAME, URI_ZIPPED]:222 fn = self._filename223 if self._filename_zip:224 fn = self._filename_zip[0]225 if is_read_request:226 # Reading: check that the file exists (but is allowed a dir)227 if not os.path.exists(fn):228 raise FileNotFoundError("No such file: '%s'" % fn)229 else:230 # Writing: check that the directory to write to does exist231 dn = os.path.dirname(fn)232 if not os.path.exists(dn):233 raise FileNotFoundError("The directory %r does not exist" % dn)234 @property235 def filename(self):236 """ The uri for which reading/saving was requested. This237 can be a filename, an http address, or other resource238 identifier. Do not rely on the filename to obtain the data,239 but use ``get_file()`` or ``get_local_filename()`` instead.240 """241 return self._filename242 @property243 def extension(self):244 """ The (lowercase) extension of the requested filename.245 Suffixes in url's are stripped. Can be None if the request is246 not based on a filename.247 """248 return self._extension249 @property250 def mode(self):251 """ The mode of the request. The first character is "r" or "w",252 indicating a read or write request. The second character is253 used to indicate the kind of data:254 "i" for an image, "I" for multiple images, "v" for a volume,255 "V" for multiple volumes, "?" for don't care.256 """257 return self._mode258 @property259 def kwargs(self):260 """ The dict of keyword arguments supplied by the user.261 """262 return self._kwargs263 ## For obtaining data264 def get_file(self):265 """ get_file()266 Get a file object for the resource associated with this request.267 If this is a reading request, the file is in read mode,268 otherwise in write mode. This method is not thread safe. Plugins269 should not close the file when done.270 This is the preferred way to read/write the data. But if a271 format cannot handle file-like objects, they should use272 ``get_local_filename()``.273 """274 want_to_write = self.mode[0] == "w"275 # Is there already a file?276 # Either _uri_type == URI_FILE, or we already opened the file,277 # e.g. by using firstbytes278 if self._file is not None:279 return self._file280 if self._uri_type == URI_BYTES:281 if want_to_write:282 # Create new file object, we catch the bytes in finish()283 self._file = BytesIO()284 self._file_is_local = True285 else:286 self._file = BytesIO(self._bytes)287 elif self._uri_type == URI_FILENAME:288 if want_to_write:289 self._file = open(self.filename, "wb")290 else:291 self._file = open(self.filename, "rb")292 elif self._uri_type == URI_ZIPPED:293 # Get the correct filename294 filename, name = self._filename_zip295 if want_to_write:296 # Create new file object, we catch the bytes in finish()297 self._file = BytesIO()298 self._file_is_local = True299 else:300 # Open zipfile and open new file object for specific file301 self._zipfile = zipfile.ZipFile(filename, "r")302 self._file = self._zipfile.open(name, "r")303 self._file = SeekableFileObject(self._file)304 elif self._uri_type in [URI_HTTP or URI_FTP]:305 assert not want_to_write # This should have been tested in init306 timeout = os.getenv('IMAGEIO_REQUEST_TIMEOUT')307 if timeout is None or not timeout.isdigit():308 timeout = 5309 self._file = urlopen(self.filename, timeout=float(timeout))310 self._file = SeekableFileObject(self._file)311 return self._file312 def get_local_filename(self):313 """ get_local_filename()314 If the filename is an existing file on this filesystem, return315 that. Otherwise a temporary file is created on the local file316 system which can be used by the format to read from or write to.317 """318 if self._uri_type == URI_FILENAME:319 return self._filename320 else:321 # Get filename322 if self._uri_type in (URI_HTTP, URI_FTP):323 ext = os.path.splitext(self._filename.split("?")[0])[1]324 else:325 ext = os.path.splitext(self._filename)[1]326 self._filename_local = tempfile.mktemp(ext, "imageio_")327 # Write stuff to it?328 if self.mode[0] == "r":329 with open(self._filename_local, "wb") as file:330 shutil.copyfileobj(self.get_file(), file)331 return self._filename_local332 def finish(self):333 """ finish()334 For internal use (called when the context of the reader/writer335 exits). Finishes this request. Close open files and process336 results.337 """338 if self.mode[0] == "w":339 # See if we "own" the data and must put it somewhere340 bytes = None341 if self._filename_local:342 with open(self._filename_local, "rb") as file:343 bytes = file.read()344 elif self._file_is_local:345 bytes = self._file.getvalue()346 # Put the data in the right place347 if bytes is not None:348 if self._uri_type == URI_BYTES:349 self._result = bytes # Picked up by imread function350 elif self._uri_type == URI_FILE:351 self._file.write(bytes)352 elif self._uri_type == URI_ZIPPED:353 zf = zipfile.ZipFile(self._filename_zip[0], "a")354 zf.writestr(self._filename_zip[1], bytes)355 zf.close()356 # elif self._uri_type == URI_FILENAME: -> is always direct357 # elif self._uri_type == URI_FTP/HTTP: -> write not supported358 # Close open files that we know of (and are responsible for)359 if self._file and self._uri_type != URI_FILE:360 self._file.close()361 self._file = None362 if self._zipfile:363 self._zipfile.close()364 self._zipfile = None365 # Remove temp file366 if self._filename_local:367 try:368 os.remove(self._filename_local)369 except Exception: # pragma: no cover370 pass371 self._filename_local = None372 # Detach so gc can clean even if a reference of self lingers373 self._bytes = None374 def get_result(self):375 """ For internal use. In some situations a write action can have376 a result (bytes data). That is obtained with this function.377 """378 self._result, res = None, self._result379 return res380 @property381 def firstbytes(self):382 """ The first 256 bytes of the file. These can be used to383 parse the header to determine the file-format.384 """385 if self._firstbytes is None:386 self._read_first_bytes()387 return self._firstbytes388 def _read_first_bytes(self, N=256):389 if self._bytes is not None:390 self._firstbytes = self._bytes[:N]391 else:392 # Prepare393 try:394 f = self.get_file()395 except IOError:396 if os.path.isdir(self.filename): # A directory, e.g. for DICOM397 self._firstbytes = bytes()398 return399 raise400 try:401 i = f.tell()402 except Exception:403 i = None404 # Read405 self._firstbytes = read_n_bytes(f, N)406 # Set back407 try:408 if i is None:409 raise Exception("cannot seek with None")410 f.seek(i)411 except Exception:412 # Prevent get_file() from reusing the file413 self._file = None414 # If the given URI was a file object, we have a problem,415 if self._uri_type == URI_FILE:416 raise IOError("Cannot seek back after getting firstbytes!")417def read_n_bytes(f, N):418 """ read_n_bytes(file, n)419 Read n bytes from the given file, or less if the file has less420 bytes. Returns zero bytes if the file is closed.421 """422 bb = bytes()423 while len(bb) < N:424 extra_bytes = f.read(N - len(bb))425 if not extra_bytes:426 break427 bb += extra_bytes428 return bb429class SeekableFileObject:430 """ A readonly wrapper file object that add support for seeking, even if431 the wrapped file object does not. The allows us to stream from http and432 still use Pillow.433 """434 def __init__(self, f):435 self.f = f436 self._i = 0 # >=0 but can exceed buffer437 self._buffer = b""438 self._have_all = False439 self.closed = False440 def read(self, n=None):441 # Fix up n442 if n is None:443 pass444 else:445 n = int(n)446 if n < 0:447 n = None448 # Can and must we read more?449 if not self._have_all:450 more = b""451 if n is None:452 more = self.f.read()453 self._have_all = True454 else:455 want_i = self._i + n456 want_more = want_i - len(self._buffer)457 if want_more > 0:458 more = self.f.read(want_more)459 if len(more) < want_more:460 self._have_all = True461 self._buffer += more462 # Read data from buffer and update pointer463 if n is None:464 res = self._buffer[self._i :]465 else:466 res = self._buffer[self._i : self._i + n]467 self._i += len(res)468 return res469 def tell(self):470 return self._i471 def seek(self, i, mode=0):472 # Mimic BytesIO behavior473 # Get the absolute new position474 i = int(i)475 if mode == 0:476 if i < 0:477 raise ValueError("negative seek value " + str(i))478 real_i = i479 elif mode == 1:480 real_i = max(0, self._i + i) # negative ok here481 elif mode == 2:482 if not self._have_all:483 self.read()484 real_i = max(0, len(self._buffer) + i)485 else:486 raise ValueError("invalid whence (%s, should be 0, 1 or 2)" % i)487 # Read some?488 if real_i <= len(self._buffer):489 pass # no need to read490 elif not self._have_all:491 assert real_i > self._i # if we don't have all, _i cannot be > _buffer492 self.read(real_i - self._i) # sets self._i493 self._i = real_i494 return self._i495 def close(self):496 self.closed = True497 self.f.close()498 def isatty(self):499 return False500 def seekable(self):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful