How to use readinto method in localstack

Best Python code snippet using localstack_python

test_file.py

Source:test_file.py Github

copy

Full Screen

...34 self.f.write(b'12')35 self.f.close()36 a = array('b', b'x'*10)37 self.f = self.open(TESTFN, 'rb')38 n = self.f.readinto(a)39 self.assertEqual(b'12', a.tobytes()[:n])40 def testReadinto_text(self):41 # verify readinto refuses text files42 a = array('b', b'x'*10)43 self.f.close()44 self.f = self.open(TESTFN, 'r')45 if hasattr(self.f, "readinto"):46 self.assertRaises(TypeError, self.f.readinto, a)47 def testWritelinesUserList(self):48 # verify writelines with instance sequence49 l = UserList([b'1', b'2'])50 self.f.writelines(l)51 self.f.close()52 self.f = self.open(TESTFN, 'rb')53 buf = self.f.read()54 self.assertEqual(buf, b'12')55 def testWritelinesIntegers(self):56 # verify writelines with integers57 self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])58 def testWritelinesIntegersUserList(self):59 # verify writelines with integers in UserList60 l = UserList([1,2,3])61 self.assertRaises(TypeError, self.f.writelines, l)62 def testWritelinesNonString(self):63 # verify writelines with non-string object64 class NonString:65 pass66 self.assertRaises(TypeError, self.f.writelines,67 [NonString(), NonString()])68 def testErrors(self):69 f = self.f70 self.assertEqual(f.name, TESTFN)71 self.assertFalse(f.isatty())72 self.assertFalse(f.closed)73 if hasattr(f, "readinto"):74 self.assertRaises((OSError, TypeError), f.readinto, "")75 f.close()76 self.assertTrue(f.closed)77 def testMethods(self):78 methods = [('fileno', ()),79 ('flush', ()),80 ('isatty', ()),81 ('__next__', ()),82 ('read', ()),83 ('write', (b"",)),84 ('readline', ()),85 ('readlines', ()),86 ('seek', (0,)),87 ('tell', ()),88 ('write', (b"",)),89 ('writelines', ([],)),90 ('__iter__', ()),91 ]92 methods.append(('truncate', ()))93 # __exit__ should close the file94 self.f.__exit__(None, None, None)95 self.assertTrue(self.f.closed)96 for methodname, args in methods:97 method = getattr(self.f, methodname)98 # should raise on closed file99 self.assertRaises(ValueError, method, *args)100 # file is closed, __exit__ shouldn't do anything101 self.assertEqual(self.f.__exit__(None, None, None), None)102 # it must also return None if an exception was given103 try:104 1/0105 except:106 self.assertEqual(self.f.__exit__(*sys.exc_info()), None)107 def testReadWhenWriting(self):108 self.assertRaises(OSError, self.f.read)109class CAutoFileTests(AutoFileTests, unittest.TestCase):110 open = io.open111class PyAutoFileTests(AutoFileTests, unittest.TestCase):112 open = staticmethod(pyio.open)113class OtherFileTests:114 def testModeStrings(self):115 # check invalid mode strings116 for mode in ("", "aU", "wU+", "U+", "+U", "rU+"):117 try:118 f = self.open(TESTFN, mode)119 except ValueError:120 pass121 else:122 f.close()123 self.fail('%r is an invalid file mode' % mode)124 def testBadModeArgument(self):125 # verify that we get a sensible error message for bad mode argument126 bad_mode = "qwerty"127 try:128 f = self.open(TESTFN, bad_mode)129 except ValueError as msg:130 if msg.args[0] != 0:131 s = str(msg)132 if TESTFN in s or bad_mode not in s:133 self.fail("bad error message for invalid mode: %s" % s)134 # if msg.args[0] == 0, we're probably on Windows where there may be135 # no obvious way to discover why open() failed.136 else:137 f.close()138 self.fail("no error for invalid mode: %s" % bad_mode)139 def testSetBufferSize(self):140 # make sure that explicitly setting the buffer size doesn't cause141 # misbehaviour especially with repeated close() calls142 for s in (-1, 0, 1, 512):143 try:144 f = self.open(TESTFN, 'wb', s)145 f.write(str(s).encode("ascii"))146 f.close()147 f.close()148 f = self.open(TESTFN, 'rb', s)149 d = int(f.read().decode("ascii"))150 f.close()151 f.close()152 except OSError as msg:153 self.fail('error setting buffer size %d: %s' % (s, str(msg)))154 self.assertEqual(d, s)155 def testTruncateOnWindows(self):156 # SF bug <http://www.python.org/sf/801631>157 # "file.truncate fault on windows"158 os.unlink(TESTFN)159 f = self.open(TESTFN, 'wb')160 try:161 f.write(b'12345678901') # 11 bytes162 f.close()163 f = self.open(TESTFN,'rb+')164 data = f.read(5)165 if data != b'12345':166 self.fail("Read on file opened for update failed %r" % data)167 if f.tell() != 5:168 self.fail("File pos after read wrong %d" % f.tell())169 f.truncate()170 if f.tell() != 5:171 self.fail("File pos after ftruncate wrong %d" % f.tell())172 f.close()173 size = os.path.getsize(TESTFN)174 if size != 5:175 self.fail("File size after ftruncate wrong %d" % size)176 finally:177 f.close()178 os.unlink(TESTFN)179 def testIteration(self):180 # Test the complex interaction when mixing file-iteration and the181 # various read* methods.182 dataoffset = 16384183 filler = b"ham\n"184 assert not dataoffset % len(filler), \185 "dataoffset must be multiple of len(filler)"186 nchunks = dataoffset // len(filler)187 testlines = [188 b"spam, spam and eggs\n",189 b"eggs, spam, ham and spam\n",190 b"saussages, spam, spam and eggs\n",191 b"spam, ham, spam and eggs\n",192 b"spam, spam, spam, spam, spam, ham, spam\n",193 b"wonderful spaaaaaam.\n"194 ]195 methods = [("readline", ()), ("read", ()), ("readlines", ()),196 ("readinto", (array("b", b" "*100),))]197 try:198 # Prepare the testfile199 bag = self.open(TESTFN, "wb")200 bag.write(filler * nchunks)201 bag.writelines(testlines)202 bag.close()203 # Test for appropriate errors mixing read* and iteration204 for methodname, args in methods:205 f = self.open(TESTFN, 'rb')206 if next(f) != filler:207 self.fail, "Broken testfile"208 meth = getattr(f, methodname)209 meth(*args) # This simply shouldn't fail210 f.close()211 # Test to see if harmless (by accident) mixing of read* and212 # iteration still works. This depends on the size of the internal213 # iteration buffer (currently 8192,) but we can test it in a214 # flexible manner. Each line in the bag o' ham is 4 bytes215 # ("h", "a", "m", "\n"), so 4096 lines of that should get us216 # exactly on the buffer boundary for any power-of-2 buffersize217 # between 4 and 16384 (inclusive).218 f = self.open(TESTFN, 'rb')219 for i in range(nchunks):220 next(f)221 testline = testlines.pop(0)222 try:223 line = f.readline()224 except ValueError:225 self.fail("readline() after next() with supposedly empty "226 "iteration-buffer failed anyway")227 if line != testline:228 self.fail("readline() after next() with empty buffer "229 "failed. Got %r, expected %r" % (line, testline))230 testline = testlines.pop(0)231 buf = array("b", b"\x00" * len(testline))232 try:233 f.readinto(buf)234 except ValueError:235 self.fail("readinto() after next() with supposedly empty "236 "iteration-buffer failed anyway")237 line = buf.tobytes()238 if line != testline:239 self.fail("readinto() after next() with empty buffer "240 "failed. Got %r, expected %r" % (line, testline))241 testline = testlines.pop(0)242 try:243 line = f.read(len(testline))244 except ValueError:245 self.fail("read() after next() with supposedly empty "246 "iteration-buffer failed anyway")247 if line != testline:248 self.fail("read() after next() with empty buffer "249 "failed. Got %r, expected %r" % (line, testline))250 try:251 lines = f.readlines()252 except ValueError:253 self.fail("readlines() after next() with supposedly empty "254 "iteration-buffer failed anyway")255 if lines != testlines:256 self.fail("readlines() after next() with empty buffer "257 "failed. Got %r, expected %r" % (line, testline))258 f.close()259 # Reading after iteration hit EOF shouldn't hurt either260 f = self.open(TESTFN, 'rb')261 try:262 for line in f:263 pass264 try:265 f.readline()266 f.readinto(buf)267 f.read()268 f.readlines()269 except ValueError:270 self.fail("read* failed after next() consumed file")271 finally:272 f.close()273 finally:274 os.unlink(TESTFN)275class COtherFileTests(OtherFileTests, unittest.TestCase):276 open = io.open277class PyOtherFileTests(OtherFileTests, unittest.TestCase):278 open = staticmethod(pyio.open)279def tearDownModule():280 # Historically, these tests have been sloppy about removing TESTFN....

Full Screen

Full Screen

signed-transaction.ts

Source:signed-transaction.ts Github

copy

Full Screen

1import { Bool, BytesReader, Codec, CompactInt } from "as-scale-codec";2import { Extrinsic, ExtrinsicType } from "./extrinsic";3/**4 * @description Class representing an Extrinsic in the Substrate Runtime5 */6export class SignedTransaction<Address extends Codec, A extends Codec, N extends Codec, S extends Codec> 7 extends Extrinsic{8 9 /**10 * from address 11 */12 public from: Address13 14 /**15 * to address16 */17 public to: Address18 /**19 * amount of the transfer20 */21 public amount: A22 /**23 * nonce of the transaction24 */25 public nonce: N26 /**27 * the signature of the transaction (64 byte array)28 */29 public signature: S;30 /**31 * Determines whether to exhaust the gas. Default false32 */33 public exhaustResourcesWhenNotFirst: Bool34 constructor(35 from: Address = instantiate<Address>(), 36 to: Address = instantiate<Address>(), 37 amount: A = instantiate<A>(), 38 nonce: N = instantiate<N>(), 39 signature: S = instantiate<S>(), 40 exhaustResourcesWhenNotFirst: Bool = new Bool()) 41 {42 super(ExtrinsicType.SignedTransaction);43 this.from = from;44 this.to = to;45 this.amount = amount;46 this.nonce = nonce;47 this.signature = signature;48 this.exhaustResourcesWhenNotFirst = exhaustResourcesWhenNotFirst;49 }50 /**51 * @description SCALE Encodes the Header into u8[]52 */53 toU8a(): u8[] {54 let len = new CompactInt(ExtrinsicType.SignedTransaction);55 return len.toU8a()56 .concat(this.from.toU8a())57 .concat(this.to.toU8a())58 .concat(this.amount.toU8a())59 .concat(this.nonce.toU8a())60 .concat(this.signature.toU8a())61 .concat(this.exhaustResourcesWhenNotFirst.toU8a());62 }63 /**64 * @description Get type id of the Extrinsic65 */66 getTypeId(): i32{67 return <i32>this.typeId;68 }69 /**70 * @description get SCALE encoded bytes for the Transfer instance71 */72 getTransferBytes(): u8[]{73 return this.from.toU8a()74 .concat(this.to.toU8a())75 .concat(this.amount.toU8a())76 .concat(this.nonce.toU8a())77 }78 /**79 * @description Get amount of transaction80 */81 getAmount(): A{82 return this.amount;83 }84 /**85 * @description Get nonce of the transaction86 */87 getNonce(): N{88 return this.nonce;89 }90 /**91 * @description Get signature of the transaction92 */93 getSignature(): S{94 return this.signature;95 }96 /**97 * @description Get sender of the transaction98 */99 getFrom(): Address{100 return this.from;101 }102 /**103 * @description Get receiver of the transaction104 */105 getTo(): Address{106 return this.to;107 }108 /**109 * @description Converts to SCALE encoded bytes110 */111 encodedLength(): i32{112 return this.toU8a().length;113 }114 /**115 * @description Checks if this instance is equal to other instance116 * @param other 117 */118 eq(other: SignedTransaction<Address, A, N, S>): bool{119 return this.from == other.from 120 && this.to == other.to121 && this.amount == other.amount122 && this.signature == other.signature;123 }124 /**125 * @description Checks if this instance is not equal to other instance126 * @param other 127 */128 notEq(other: SignedTransaction<Address, A, N, S>): bool{129 return !this.eq(other);130 }131 /**132 * @description Non static constructor from bytes133 * @param bytes SCALE encoded bytes134 * @param index starting index135 */136 populateFromBytes(bytes: u8[], index: i32 = 0): void {137 assert(bytes.length - index > this.getTypeId(), "SignedTransaction: Bytes array with insufficient length");138 const bytesReader = new BytesReader(bytes.slice(index));139 let length = bytesReader.readInto<CompactInt>();140 assert(<i32>length.unwrap() == this.typeId, "SignedTransaction: Incorrectly encoded SignedTransaction");141 this.from = bytesReader.readInto<Address>();142 this.to = bytesReader.readInto<Address>();143 this.amount = bytesReader.readInto<A>();144 this.nonce = bytesReader.readInto<N>();145 this.signature = bytesReader.readInto<S>();146 }147 /**148 * @description Instanciates new Extrinsic object from SCALE encoded byte array149 * @param input - SCALE encoded Extrinsic150 */151 static fromU8Array<Address extends Codec, A extends Codec, N extends Codec, S extends Codec>(input: u8[], index: i32 = 0): Extrinsic {152 assert(input.length - index >= ExtrinsicType.SignedTransaction, "Extrinsic: Invalid bytes provided. EOF");153 const bytesReader = new BytesReader(input.slice(index));154 const from = bytesReader.readInto<Address>();155 const to = bytesReader.readInto<Address>();156 const amount = bytesReader.readInto<A>();157 const nonce = bytesReader.readInto<N>();158 const signature = bytesReader.readInto<S>();159 const exhaustResourcesWhenNotFirst = bytesReader.readInto<Bool>();160 return new SignedTransaction(from, to, amount, nonce, signature, exhaustResourcesWhenNotFirst);161 }...

Full Screen

Full Screen

spi.py

Source:spi.py Github

copy

Full Screen

...32SPI(0, SPI.MASTER, polarity=0, phase=0, pins=('GP31', 'GP16', 'GP15'))33spi = SPI(0, SPI.MASTER, baudrate=10000000, polarity=0, phase=0, pins=spi_pins)34print(spi.write('123456') == 6)35buffer_r = bytearray(10)36print(spi.readinto(buffer_r) == 10)37print(spi.readinto(buffer_r, write=0x55) == 10)38read = spi.read(10)39print(len(read) == 10)40read = spi.read(10, write=0xFF)41print(len(read) == 10)42buffer_w = bytearray([1, 2, 3, 4, 5, 6, 7, 8, 9, 0])43print(spi.write_readinto(buffer_w, buffer_r) == 10)44print(buffer_w == buffer_r)45# test all polaritiy and phase combinations46spi.init(polarity=1, phase=0, pins=None)47buffer_r = bytearray(10)48spi.write_readinto(buffer_w, buffer_r)49print(buffer_w == buffer_r)50spi.init(polarity=1, phase=1, pins=None)51buffer_r = bytearray(10)52spi.write_readinto(buffer_w, buffer_r)53print(buffer_w == buffer_r)54spi.init(polarity=0, phase=1, pins=None)55buffer_r = bytearray(10)56spi.write_readinto(buffer_w, buffer_r)57print(buffer_w == buffer_r)58# test 16 and 32 bit transfers59buffer_w = bytearray([1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2])60buffer_r = bytearray(12)61spi.init(SPI.MASTER, baudrate=10000000, bits=16, polarity=0, phase=0, pins=None)62print(spi.write_readinto(buffer_w, buffer_r) == 12)63print(buffer_w == buffer_r)64buffer_r = bytearray(12)65spi.init(SPI.MASTER, baudrate=10000000, bits=32, polarity=0, phase=0, pins=None)66print(spi.write_readinto(buffer_w, buffer_r) == 12)67print(buffer_w == buffer_r)68# check for memory leaks...69for i in range (0, 1000):70 spi = SPI(0, SPI.MASTER, baudrate=1000000)71# test deinit72spi = SPI(0, SPI.MASTER, baudrate=1000000)73spi.deinit()74print(spi)75spi = SPI(0, SPI.MASTER, baudrate=1000000)76# next ones must fail77try:78 spi = SPI(0, 10, baudrate=10000000, polarity=0, phase=0)79except:80 print("Exception")81try:82 spi = SPI(0, mode=SPI.MASTER, baudrate=10000000, polarity=1, phase=2)83except:84 print("Exception")85try:86 spi = SPI(1, mode=SPI.MASTER, baudrate=10000000, polarity=1, phase=1)87except:88 print("Exception")89try:90 spi = SPI(0, mode=SPI.MASTER, baudrate=2000000, polarity=2, phase=0)91except:92 print("Exception")93try:94 spi = SPI(0, mode=SPI.MASTER, baudrate=2000000, polarity=2, phase=0, firstbit=2)95except:96 print("Exception")97try:98 spi = SPI(0, mode=SPI.MASTER, baudrate=2000000, polarity=2, phase=0, pins=('GP1', 'GP2'))99except:100 print("Exception")101try:102 spi = SPI(0, mode=SPI.MASTER, baudrate=2000000, polarity=0, phase=0, bits=9)103except:104 print("Exception")105spi.deinit()106try:107 spi.read(15)108except Exception:109 print("Exception")110try:111 spi.spi.readinto(buffer_r)112except Exception:113 print("Exception")114try:115 spi.spi.write('abc')116except Exception:117 print("Exception")118try:119 spi.write_readinto(buffer_w, buffer_r)120except Exception:121 print("Exception")122# reinitialization must work123spi.init(baudrate=500000)...

Full Screen

Full Screen

transfer.ts

Source:transfer.ts Github

copy

Full Screen

1import { BytesReader, Codec, CompactInt } from "as-scale-codec";2import { Utils } from "../../utils";3/**4 * @description Class representing a Transfer in the Substrate Runtime5 */6export class Transfer<Address extends Codec, A extends Codec, N extends Codec> 7 implements Codec {8 9 /**10 * from address 11 */12 public from: Address13 14 /**15 * to address16 */17 public to: Address18 /**19 * amount of the transfer20 */21 public amount: A22 /**23 * nonce of the transaction24 */25 public nonce: N26 constructor(27 from: Address = instantiate<Address>(), 28 to: Address = instantiate<Address>(), 29 amount: A = instantiate<A>(), 30 nonce: N = instantiate<N>()) 31 {32 this.from = from;33 this.to = to;34 this.amount = amount;35 this.nonce = nonce;36 }37 /**38 * @description SCALE Encodes the Header into u8[]39 */40 toU8a(): u8[] {41 const result: u8[] = this.from.toU8a()42 .concat(this.to.toU8a())43 .concat(this.amount.toU8a())44 .concat(this.nonce.toU8a());45 return Utils.encodeCompact(result);46 }47 /**48 * @description get SCALE encoded bytes for the Transfer instance49 */50 getTransferBytes(): u8[]{51 return this.from.toU8a()52 .concat(this.to.toU8a())53 .concat(this.amount.toU8a())54 .concat(this.nonce.toU8a())55 }56 /**57 * @description Get amount of transaction58 */59 getAmount(): A{60 return this.amount;61 }62 /**63 * @description Get nonce of the transaction64 */65 getNonce(): N{66 return this.nonce;67 }68 /**69 * @description Get sender of the transaction70 */71 getFrom(): Address{72 return this.from;73 }74 /**75 * @description Get receiver of the transaction76 */77 getTo(): Address{78 return this.to;79 }80 /**81 * @description Gets bytes length of the instance82 */83 encodedLength(): i32{84 return this.toU8a().length;85 }86 /**87 * @description Checks if this instance is equal to other instance88 * @param other 89 */90 eq(other: Transfer<Address, A, N>): bool{91 return this.from.eq(other.from) 92 && this.to.eq(other.to)93 && this.amount.eq(other.amount)94 && this.nonce.eq(other.nonce);95 }96 /**97 * @description Checks if this instance is not equal to other instance98 * @param other 99 */100 notEq(other: Transfer<Address, A, N>): bool{101 return !this.eq(other);102 }103 /**104 * @description Non static constructor from bytes105 * @param bytes SCALE encoded bytes106 * @param index starting index107 */108 populateFromBytes(bytes: u8[], index: i32 = 0): void {109 const bytesReader = new BytesReader(bytes.slice(index));110 let _length = bytesReader.readInto<CompactInt>();111 this.from = bytesReader.readInto<Address>();112 this.to = bytesReader.readInto<Address>();113 this.amount = bytesReader.readInto<A>();114 this.nonce = bytesReader.readInto<N>();115 }116 /**117 * @description Instanciates new Transfer object from SCALE encoded byte array118 * @param input - SCALE encoded Transfer119 */120 static fromU8Array<Address extends Codec, A extends Codec, N extends Codec>(input: u8[], index: i32 = 0): Transfer<Address, A, N> {121 const bytesReader = new BytesReader(input.slice(index));122 let _length = bytesReader.readInto<CompactInt>();123 const from = bytesReader.readInto<Address>();124 const to = bytesReader.readInto<Address>();125 const amount = bytesReader.readInto<A>();126 const nonce = bytesReader.readInto<N>();127 return new Transfer(from, to, amount, nonce);128 }...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful