How to use connection_made method in pytest-asyncio

Best Python code snippet using pytest-asyncio_python

Choker.py

Source:Choker.py Github

copy

Full Screen

...53 count += 154 hit = True55 else:56 u.choke()57 def connection_made(self, connection, p = None):58 if p is None:59 p = randrange(-2, len(self.connections) + 1)60 self.connections.insert(max(p, 0), connection)61 self._rechoke()62 def connection_lost(self, connection):63 self.connections.remove(connection)64 if connection.get_upload().is_interested() and not connection.get_upload().is_choked():65 self._rechoke()66 def interested(self, connection):67 if not connection.get_upload().is_choked():68 self._rechoke()69 def not_interested(self, connection):70 if not connection.get_upload().is_choked():71 self._rechoke()72 def change_max_uploads(self, newval):73 def foo(self=self, newval=newval):74 self._change_max_uploads(newval)75 self.schedule(foo, 0);76 77 def _change_max_uploads(self, newval):78 self.max_uploads = newval79 self._rechoke()80class DummyScheduler:81 def __init__(self):82 self.s = []83 def __call__(self, func, delay):84 self.s.append((func, delay))85class DummyConnection:86 def __init__(self, v = 0):87 self.u = DummyUploader()88 self.d = DummyDownloader(self)89 self.v = v90 91 def get_upload(self):92 return self.u93 def get_download(self):94 return self.d95class DummyDownloader:96 def __init__(self, c):97 self.s = False98 self.c = c99 def is_snubbed(self):100 return self.s101 def get_rate(self):102 return self.c.v103class DummyUploader:104 def __init__(self):105 self.i = False106 self.c = True107 def choke(self):108 if not self.c:109 self.c = True110 def unchoke(self):111 if self.c:112 self.c = False113 def is_choked(self):114 return self.c115 def is_interested(self):116 return self.i117def test_round_robin_with_no_downloads():118 s = DummyScheduler()119 Choker(2, s)120 assert len(s.s) == 1121 assert s.s[0][1] == 10122 s.s[0][0]()123 del s.s[0]124 assert len(s.s) == 1125 assert s.s[0][1] == 10126 s.s[0][0]()127 del s.s[0]128 s.s[0][0]()129 del s.s[0]130 s.s[0][0]()131 del s.s[0]132def test_resort():133 s = DummyScheduler()134 choker = Choker(1, s)135 c1 = DummyConnection()136 c2 = DummyConnection(1)137 c3 = DummyConnection(2)138 c4 = DummyConnection(3)139 c2.u.i = True140 c3.u.i = True141 choker.connection_made(c1)142 assert not c1.u.c143 choker.connection_made(c2, 1)144 assert not c1.u.c145 assert not c2.u.c146 choker.connection_made(c3, 1)147 assert not c1.u.c148 assert c2.u.c149 assert not c3.u.c150 c2.v = 2151 c3.v = 1152 choker.connection_made(c4, 1)153 assert not c1.u.c154 assert c2.u.c155 assert not c3.u.c156 assert not c4.u.c157 choker.connection_lost(c4)158 assert not c1.u.c159 assert c2.u.c160 assert not c3.u.c161 s.s[0][0]()162 assert not c1.u.c163 assert c2.u.c164 assert not c3.u.c165def test_interest():166 s = DummyScheduler()167 choker = Choker(1, s)168 c1 = DummyConnection()169 c2 = DummyConnection(1)170 c3 = DummyConnection(2)171 c2.u.i = True172 c3.u.i = True173 choker.connection_made(c1)174 assert not c1.u.c175 choker.connection_made(c2, 1)176 assert not c1.u.c177 assert not c2.u.c178 choker.connection_made(c3, 1)179 assert not c1.u.c180 assert c2.u.c181 assert not c3.u.c182 c3.u.i = False183 choker.not_interested(c3)184 assert not c1.u.c185 assert not c2.u.c186 assert not c3.u.c187 c3.u.i = True188 choker.interested(c3)189 assert not c1.u.c190 assert c2.u.c191 assert not c3.u.c192 choker.connection_lost(c3)193 assert not c1.u.c194 assert not c2.u.c195def test_robin_interest():196 s = DummyScheduler()197 choker = Choker(1, s)198 c1 = DummyConnection(0)199 c2 = DummyConnection(1)200 c1.u.i = True201 choker.connection_made(c2)202 assert not c2.u.c203 choker.connection_made(c1, 0)204 assert not c1.u.c205 assert c2.u.c206 c1.u.i = False207 choker.not_interested(c1)208 assert not c1.u.c209 assert not c2.u.c210 c1.u.i = True211 choker.interested(c1)212 assert not c1.u.c213 assert c2.u.c214 choker.connection_lost(c1)215 assert not c2.u.c216def test_skip_not_interested():217 s = DummyScheduler()218 choker = Choker(1, s)219 c1 = DummyConnection(0)220 c2 = DummyConnection(1)221 c3 = DummyConnection(2)222 c1.u.i = True223 c3.u.i = True224 choker.connection_made(c2)225 assert not c2.u.c226 choker.connection_made(c1, 0)227 assert not c1.u.c228 assert c2.u.c229 choker.connection_made(c3, 2)230 assert not c1.u.c231 assert c2.u.c232 assert c3.u.c233 f = s.s[0][0]234 f()235 assert not c1.u.c236 assert c2.u.c237 assert c3.u.c238 f()239 assert not c1.u.c240 assert c2.u.c241 assert c3.u.c242 f()243 assert c1.u.c244 assert c2.u.c245 assert not c3.u.c246def test_connection_lost_no_interrupt():247 s = DummyScheduler()248 choker = Choker(1, s)249 c1 = DummyConnection(0)250 c2 = DummyConnection(1)251 c3 = DummyConnection(2)252 c1.u.i = True253 c2.u.i = True254 c3.u.i = True255 choker.connection_made(c1)256 choker.connection_made(c2, 1)257 choker.connection_made(c3, 2)258 f = s.s[0][0]259 f()260 assert not c1.u.c261 assert c2.u.c262 assert c3.u.c263 f()264 assert not c1.u.c265 assert c2.u.c266 assert c3.u.c267 f()268 assert c1.u.c269 assert not c2.u.c270 assert c3.u.c271 f()272 assert c1.u.c273 assert not c2.u.c274 assert c3.u.c275 f()276 assert c1.u.c277 assert not c2.u.c278 assert c3.u.c279 choker.connection_lost(c3)280 assert c1.u.c281 assert not c2.u.c282 f()283 assert not c1.u.c284 assert c2.u.c285 choker.connection_lost(c2)286 assert not c1.u.c287def test_connection_made_no_interrupt():288 s = DummyScheduler()289 choker = Choker(1, s)290 c1 = DummyConnection(0)291 c2 = DummyConnection(1)292 c3 = DummyConnection(2)293 c1.u.i = True294 c2.u.i = True295 c3.u.i = True296 choker.connection_made(c1)297 choker.connection_made(c2, 1)298 f = s.s[0][0]299 assert not c1.u.c300 assert c2.u.c301 f()302 assert not c1.u.c303 assert c2.u.c304 f()305 assert not c1.u.c306 assert c2.u.c307 choker.connection_made(c3, 1)308 assert not c1.u.c309 assert c2.u.c310 assert c3.u.c311 f()312 assert c1.u.c313 assert c2.u.c314 assert not c3.u.c315def test_round_robin():316 s = DummyScheduler()317 choker = Choker(1, s)318 c1 = DummyConnection(0)319 c2 = DummyConnection(1)320 c1.u.i = True321 c2.u.i = True322 choker.connection_made(c1)323 choker.connection_made(c2, 1)324 f = s.s[0][0]325 assert not c1.u.c326 assert c2.u.c327 f()328 assert not c1.u.c329 assert c2.u.c330 f()331 assert not c1.u.c332 assert c2.u.c333 f()334 assert c1.u.c335 assert not c2.u.c336 f()337 assert c1.u.c338 assert not c2.u.c339 f()340 assert c1.u.c341 assert not c2.u.c342 f()343 assert not c1.u.c344 assert c2.u.c345 346def test_multi():347 s = DummyScheduler()348 choker = Choker(4, s)349 c1 = DummyConnection(0)350 c2 = DummyConnection(0)351 c3 = DummyConnection(0)352 c4 = DummyConnection(8)353 c5 = DummyConnection(0)354 c6 = DummyConnection(0)355 c7 = DummyConnection(6)356 c8 = DummyConnection(0)357 c9 = DummyConnection(9)358 c10 = DummyConnection(7)359 c11 = DummyConnection(10)360 choker.connection_made(c1, 0)361 choker.connection_made(c2, 1)362 choker.connection_made(c3, 2)363 choker.connection_made(c4, 3)364 choker.connection_made(c5, 4)365 choker.connection_made(c6, 5)366 choker.connection_made(c7, 6)367 choker.connection_made(c8, 7)368 choker.connection_made(c9, 8)369 choker.connection_made(c10, 9)370 choker.connection_made(c11, 10)371 c2.u.i = True372 c4.u.i = True373 c6.u.i = True374 c8.u.i = True375 c10.u.i = True376 c2.d.s = True377 c6.d.s = True378 c8.d.s = True379 s.s[0][0]()380 assert not c1.u.c381 assert not c2.u.c382 assert not c3.u.c383 assert not c4.u.c384 assert not c5.u.c...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run pytest-asyncio automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful