How to use register_stream_consumer method in localstack

Best Python code snippet using localstack_python

test_stream_reader.py

Source:test_stream_reader.py Github

copy

Full Screen

...14var num_events = 0;15redis.register_function("num_events", function(){16 return num_events;17})18redis.register_stream_consumer("consumer", "stream", 1, false, function(){19 num_events++;20})21 """22 env.expect('RG.FCALL', 'lib', 'num_events', '0').equal(0)23 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')24 env.expect('RG.FCALL', 'lib', 'num_events', '0').equal(1)25 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')26 env.expect('RG.FCALL', 'lib', 'num_events', '0').equal(2)27@gearsTest(decodeResponses=False)28def testBasicStreamReaderWithBinaryData(env):29 """#!js name=lib30var last_key = null;31var last_key_raw = null;32var last_data = null;33var last_data_raw = null;34redis.register_function("stats", function(){35 return [36 last_key,37 last_key_raw,38 last_data,39 last_data_raw40 ];41})42redis.register_stream_consumer("consumer", new Uint8Array([255]).buffer, 1, false, function(c, data){43 last_key = data.stream_name;44 last_key_raw = data.stream_name_raw;45 last_data = data.record;46 last_data_raw = data.record_raw;47})48 """49 env.expect('RG.FCALL', 'lib', 'stats', '0').equal([None, None, None, None])50 env.cmd('xadd', b'\xff\xff', '*', b'\xaa', b'\xaa')51 env.expect('RG.FCALL', 'lib', 'stats', '0').equal([None, b'\xff\xff', [[None, None]], [[b'\xaa', b'\xaa']]])52@gearsTest()53def testAsyncStreamReader(env):54 """#!js name=lib55var num_events = 0;56redis.register_function("num_events", function(){57 return num_events;58})59redis.register_stream_consumer("consumer", "stream", 1, false, async function(){60 num_events++;61})62 """63 env.expect('RG.FCALL', 'lib', 'num_events', '0').equal(0)64 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')65 runUntil(env, 1, lambda: env.cmd('RG.FCALL', 'lib', 'num_events', '0'))66 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')67 runUntil(env, 2, lambda: env.cmd('RG.FCALL', 'lib', 'num_events', '0'))68@gearsTest()69def testStreamTrim(env):70 """#!js name=lib71var num_events = 0;72redis.register_function("num_events", function(){73 return num_events;74})75redis.register_stream_consumer("consumer", "stream", 1, true, function(){76 num_events++;77})78 """79 env.expect('RG.FCALL', 'lib', 'num_events', '0').equal(0)80 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')81 env.expect('xlen', 'stream:1').equal(0)82 env.expect('RG.FCALL', 'lib', 'num_events', '0').equal(1)83 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')84 env.expect('xlen', 'stream:1').equal(0)85 env.expect('RG.FCALL', 'lib', 'num_events', '0').equal(2)86@gearsTest()87def testStreamProccessError(env):88 """#!js name=lib89redis.register_stream_consumer("consumer", "stream", 1, false, function(){90 throw 'Error';91})92 """93 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')94 res = toDictionary(env.cmd('RG.FUNCTION', 'LIST', 'vv'), 6)95 env.assertEqual('Error', res[0]['stream_consumers'][0]['streams'][0]['last_error'])96@gearsTest()97def testStreamWindow(env):98 """#!js name=lib99var promises = [];100redis.register_function("num_pending", function(){101 return promises.length;102})103redis.register_function("continue", function(){104 if (promises.length == 0) {105 throw "No pending records"106 }107 promises[0]('continue');108 promises.shift()109 return "OK"110})111redis.register_stream_consumer("consumer", "stream", 3, true, async function(){112 return await new Promise((resolve, reject) => {113 promises.push(resolve);114 });115})116 """117 env.expect('RG.FCALL', 'lib', 'num_pending', '0').equal(0)118 env.expect('RG.FCALL', 'lib', 'continue', '0').error().contains('No pending records')119 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')120 runUntil(env, 1, lambda: env.cmd('RG.FCALL', 'lib', 'num_pending', '0'))121 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')122 runUntil(env, 2, lambda: env.cmd('RG.FCALL', 'lib', 'num_pending', '0'))123 124 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')125 runUntil(env, 3, lambda: env.cmd('RG.FCALL', 'lib', 'num_pending', '0'))126 res = toDictionary(env.cmd('RG.FUNCTION', 'LIST', 'vvv'), 6)127 env.assertEqual(3, len(res[0]['stream_consumers'][0]['streams'][0]['pending_ids']))128 env.expect('RG.FCALL', 'lib', 'continue', '0').equal('OK')129 runUntil(env, 2, lambda: env.cmd('RG.FCALL', 'lib', 'num_pending', '0'))130 131 runUntil(env, 2, lambda: len(toDictionary(env.cmd('RG.FUNCTION', 'LIST', 'vvv'), 6)[0]['stream_consumers'][0]['streams'][0]['pending_ids']))132 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')133 runUntil(env, 3, lambda: env.cmd('RG.FCALL', 'lib', 'num_pending', '0'))134 res = toDictionary(env.cmd('RG.FUNCTION', 'LIST', 'vvv'), 6)135 env.assertEqual(3, len(res[0]['stream_consumers'][0]['streams'][0]['pending_ids']))136 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')137 runFor(3, lambda: env.cmd('RG.FCALL', 'lib', 'num_pending', '0'))138 env.expect('RG.FCALL', 'lib', 'continue', '0').equal('OK')139 runUntil(env, 3, lambda: env.cmd('RG.FCALL', 'lib', 'num_pending', '0'))140 res = toDictionary(env.cmd('RG.FUNCTION', 'LIST', 'vvv'), 6)141 env.assertEqual(2, res[0]['stream_consumers'][0]['streams'][0]['total_record_processed'])142@gearsTest(envArgs={'useSlaves': True})143def testStreamWithReplication(env):144 """#!js name=lib145var promises = [];146redis.register_function("num_pending", function(){147 return promises.length;148})149redis.register_function("continue", function(){150 if (promises.length == 0) {151 throw "No pending records"152 }153 p = promises[0]154 promises.shift()155 p[1]('continue')156 id = p[0].id;157 return id[0].toString() + "-" + id[1].toString()158})159redis.register_stream_consumer("consumer", "stream", 3, true, async function(client, data){160 return await new Promise((resolve, reject) => {161 promises.push([data,resolve]);162 });163})164 """165 slave_conn = env.getSlaveConnection()166 env.expect('WAIT', '1', '7000').equal(1)167 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')168 runUntil(env, 1, lambda: env.cmd('RG.FCALL', 'lib', 'num_pending', '0'))169 res = toDictionary(env.cmd('RG.FUNCTION', 'LIST', 'vvv'), 6)170 id_to_read_from = res[0]['stream_consumers'][0]['streams'][0]['id_to_read_from']171 env.expect('RG.FCALL', 'lib', 'continue', '0').equal(id_to_read_from)172 runUntil(env, 1, lambda: len(toDictionary(slave_conn.execute_command('RG.FUNCTION', 'LIST', 'vvv'), 6)[0]['stream_consumers'][0]['streams']))173 env.assertEqual(id_to_read_from, toDictionary(slave_conn.execute_command('RG.FUNCTION', 'LIST', 'vvv'), 6)[0]['stream_consumers'][0]['streams'][0]['id_to_read_from'])174 # add 2 more record to the stream175 id1 = env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')176 id2 = env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')177 runUntil(env, 2, lambda: slave_conn.execute_command('xlen', 'stream:1'))178 # Turn slave to master179 slave_conn.execute_command('slaveof', 'no', 'one')180 runUntil(env, 2, lambda: slave_conn.execute_command('RG.FCALL', 'lib', 'num_pending', '0'))181 def continue_function():182 try:183 return slave_conn.execute_command('RG.FCALL', 'lib', 'continue', '0')184 except Exception as e:185 return str(e)186 runUntil(env, id1, continue_function)187 runUntil(env, id2, continue_function)188@gearsTest()189def testStreamDeletoin(env):190 """#!js name=lib191var promises = [];192redis.register_function("num_pending", function(){193 return promises.length;194})195redis.register_function("continue", function(){196 if (promises.length == 0) {197 throw "No pending records"198 }199 promises[0]('continue');200 promises.shift()201 return "OK"202})203redis.register_stream_consumer("consumer", "stream", 3, true, async function(){204 return await new Promise((resolve, reject) => {205 promises.push(resolve);206 });207})208 """209 env.expect('RG.FCALL', 'lib', 'num_pending', '0').equal(0)210 env.expect('RG.FCALL', 'lib', 'continue', '0').error().contains('No pending records')211 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')212 runUntil(env, 1, lambda: env.cmd('RG.FCALL', 'lib', 'num_pending', '0'))213 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')214 runUntil(env, 2, lambda: env.cmd('RG.FCALL', 'lib', 'num_pending', '0'))215 216 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')217 runUntil(env, 3, lambda: env.cmd('RG.FCALL', 'lib', 'num_pending', '0'))218 env.expect('del', 'stream:1').equal(1)219 env.expect('RG.FCALL', 'lib', 'continue', '0').equal('OK')220 runUntil(env, 2, lambda: env.cmd('RG.FCALL', 'lib', 'num_pending', '0'))221 env.expect('RG.FCALL', 'lib', 'continue', '0').equal('OK')222 runUntil(env, 1, lambda: env.cmd('RG.FCALL', 'lib', 'num_pending', '0'))223 env.expect('RG.FCALL', 'lib', 'continue', '0').equal('OK')224 runUntil(env, 0, lambda: env.cmd('RG.FCALL', 'lib', 'num_pending', '0'))225 res = env.cmd('RG.FUNCTION', 'LIST', 'vvv')226 env.assertEqual(0, len(toDictionary(res, 6)[0]['stream_consumers'][0]['streams']))227@gearsTest()228def testFlushall(env):229 """#!js name=lib230var promises = [];231redis.register_function("num_pending", function(){232 return promises.length;233})234redis.register_function("continue", function(){235 if (promises.length == 0) {236 throw "No pending records"237 }238 promises[0]('continue');239 promises.shift()240 return "OK"241})242redis.register_stream_consumer("consumer", "stream", 3, true, async function(){243 return await new Promise((resolve, reject) => {244 promises.push(resolve);245 });246})247 """248 env.expect('RG.FCALL', 'lib', 'num_pending', '0').equal(0)249 env.expect('RG.FCALL', 'lib', 'continue', '0').error().contains('No pending records')250 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')251 runUntil(env, 1, lambda: env.cmd('RG.FCALL', 'lib', 'num_pending', '0'))252 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')253 runUntil(env, 2, lambda: env.cmd('RG.FCALL', 'lib', 'num_pending', '0'))254 255 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')256 runUntil(env, 3, lambda: env.cmd('RG.FCALL', 'lib', 'num_pending', '0'))257 env.expect('flushall').equal(True)258 env.expect('RG.FCALL', 'lib', 'continue', '0').equal('OK')259 runUntil(env, 2, lambda: env.cmd('RG.FCALL', 'lib', 'num_pending', '0'))260 env.expect('RG.FCALL', 'lib', 'continue', '0').equal('OK')261 runUntil(env, 1, lambda: env.cmd('RG.FCALL', 'lib', 'num_pending', '0'))262 env.expect('RG.FCALL', 'lib', 'continue', '0').equal('OK')263 runUntil(env, 0, lambda: env.cmd('RG.FCALL', 'lib', 'num_pending', '0'))264 res = env.cmd('RG.FUNCTION', 'LIST', 'vvv')265 env.assertEqual(0, len(toDictionary(res, 6)[0]['stream_consumers'][0]['streams']))266@gearsTest()267def testMultipleConsumers(env):268 script = """#!js name=%s269var promises = [];270redis.register_function("num_pending", function(){271 return promises.length;272})273redis.register_function("continue", function(){274 if (promises.length == 0) {275 throw "No pending records"276 }277 promises[0]('continue');278 promises.shift()279 return "OK"280})281redis.register_stream_consumer("consumer", "stream", 3, true, async function(){282 return await new Promise((resolve, reject) => {283 promises.push(resolve);284 });285})286 """287 env.expect('RG.FUNCTION', 'LOAD', script % 'lib1').equal('OK')288 env.expect('RG.FUNCTION', 'LOAD', script % 'lib2').equal('OK')289 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')290 runUntil(env, 1, lambda: env.cmd('RG.FCALL', 'lib1', 'num_pending', '0'))291 runUntil(env, 1, lambda: env.cmd('RG.FCALL', 'lib2', 'num_pending', '0'))292 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')293 runUntil(env, 2, lambda: env.cmd('RG.FCALL', 'lib1', 'num_pending', '0'))294 runUntil(env, 2, lambda: env.cmd('RG.FCALL', 'lib2', 'num_pending', '0'))295 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')296 runUntil(env, 3, lambda: env.cmd('RG.FCALL', 'lib1', 'num_pending', '0'))297 runUntil(env, 3, lambda: env.cmd('RG.FCALL', 'lib2', 'num_pending', '0'))298 env.expect('RG.FCALL', 'lib1', 'continue', '0').equal('OK')299 runUntil(env, 2, lambda: env.cmd('RG.FCALL', 'lib1', 'num_pending', '0'))300 runUntil(env, 3, lambda: env.cmd('RG.FCALL', 'lib2', 'num_pending', '0'))301 runFor(3, lambda: env.cmd('XLEN', 'stream:1')) # make sure not trimming302 env.expect('RG.FCALL', 'lib2', 'continue', '0').equal('OK')303 runUntil(env, 2, lambda: env.cmd('RG.FCALL', 'lib1', 'num_pending', '0'))304 runUntil(env, 2, lambda: env.cmd('RG.FCALL', 'lib2', 'num_pending', '0'))305 runUntil(env, 2, lambda: env.cmd('XLEN', 'stream:1'))306 env.expect('RG.FCALL', 'lib1', 'continue', '0').equal('OK')307 env.expect('RG.FCALL', 'lib1', 'continue', '0').equal('OK')308 runUntil(env, 0, lambda: env.cmd('RG.FCALL', 'lib1', 'num_pending', '0'))309 runUntil(env, 2, lambda: env.cmd('RG.FCALL', 'lib2', 'num_pending', '0'))310 runFor(2, lambda: env.cmd('XLEN', 'stream:1')) # make sure not trimming311 env.expect('RG.FCALL', 'lib2', 'continue', '0').equal('OK')312 env.expect('RG.FCALL', 'lib2', 'continue', '0').equal('OK')313 runUntil(env, 0, lambda: env.cmd('RG.FCALL', 'lib1', 'num_pending', '0'))314 runUntil(env, 0, lambda: env.cmd('RG.FCALL', 'lib2', 'num_pending', '0'))315 runUntil(env, 0, lambda: env.cmd('XLEN', 'stream:1'))316@gearsTest()317def testMultipleStreamsForConsumer(env):318 """#!js name=lib319var streams = [];320redis.register_function("get_stream", function(){321 if (streams.length == 0) {322 throw "No streams"323 }324 let name = streams[0];325 streams.shift();326 return name327})328redis.register_stream_consumer("consumer", "stream", 1, true, async function(client, data){329 streams.push(data.stream_name)330})331 """332 env.expect('RG.FCALL', 'lib', 'get_stream', '0').error().contains('No streams')333 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')334 env.cmd('xadd', 'stream:2', '*', 'foo', 'bar')335 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')336 env.cmd('xadd', 'stream:2', '*', 'foo', 'bar')337 runUntil(env, 'stream:1', lambda: env.cmd('RG.FCALL', 'lib', 'get_stream', '0'), timeout=1)338 runUntil(env, 'stream:2', lambda: env.cmd('RG.FCALL', 'lib', 'get_stream', '0'), timeout=1)339 runUntil(env, 'stream:1', lambda: env.cmd('RG.FCALL', 'lib', 'get_stream', '0'), timeout=1)340 runUntil(env, 'stream:2', lambda: env.cmd('RG.FCALL', 'lib', 'get_stream', '0'), timeout=1)341@gearsTest()342def testRDBSaveAndLoad(env):343 """#!js name=lib344redis.register_stream_consumer("consumer", "stream", 1, false, async function(client, data){345 redis.log(data.id);346})347 """348 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')349 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')350 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')351 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')352 runUntil(env, 4, lambda: toDictionary(env.execute_command('RG.FUNCTION', 'LIST', 'vvv'), 6)[0]['stream_consumers'][0]['streams'][0]['total_record_processed'])353 id_to_read_from1 = toDictionary(env.execute_command('RG.FUNCTION', 'LIST', 'vvv'), 6)[0]['stream_consumers'][0]['streams'][0]['id_to_read_from']354 env.expect('DEBUG', 'RELOAD').equal('OK')355 id_to_read_from2 = toDictionary(env.execute_command('RG.FUNCTION', 'LIST', 'vvv'), 6)[0]['stream_consumers'][0]['streams'][0]['id_to_read_from']356 env.assertEqual(id_to_read_from1, id_to_read_from2)357@gearsTest()358def testCallingRedisCommandOnStreamConsumer(env):359 """#!js name=lib360redis.register_stream_consumer("consumer", "stream", 1, false, function(client, data){361 client.call('ping');362})363 """364 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')365 366 runUntil(env, 1, lambda: toDictionary(env.execute_command('RG.FUNCTION', 'LIST', 'vvv'), 6)[0]['stream_consumers'][0]['streams'][0]['total_record_processed'])367 env.assertEqual('None', toDictionary(env.execute_command('RG.FUNCTION', 'LIST', 'vvv'), 6)[0]['stream_consumers'][0]['streams'][0]['last_error'])368@gearsTest()369def testBecomeReplicaWhileProcessingData(env):370 """#!js name=lib371var promise = null;372var done = null;373redis.register_function("continue_process", async function(){374 if (promise == null) {375 return "no data to processes";376 }377 var p = new Promise((resume, reject) => {378 done = resume;379 });380 promise("continue");381 promise = null;382 return await p;383},384['no-writes'])385redis.register_stream_consumer("consumer", "stream", 1, true, async function(client) {386 await new Promise((resume, reject) => {387 promise = resume;388 });389 done("OK");390}) 391 """392 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')393 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')394 env.cmd('xadd', 'stream:1', '*', 'foo', 'bar')395 runUntil(env, 'OK', lambda: env.cmd('RG.FCALL', 'lib', 'continue_process', '0'))396 runUntil(env, 1, lambda: toDictionary(toDictionary(env.execute_command('RG.FUNCTION', 'LIST', 'vvv'), 4)[0]['stream_consumers'][0]['streams'][0], 1)['total_record_processed'])397 # Turn into a slave398 env.cmd('slaveof', '127.0.0.1', '3300')399 runUntil(env, 'OK', lambda: env.cmd('RG.FCALL', 'lib', 'continue_process', '0'))...

Full Screen

Full Screen

test_eventstreams.py

Source:test_eventstreams.py Github

copy

Full Screen

...31 for k in keys:32 await kinesis_client.put_record(33 StreamName=stream_name, Data=k, PartitionKey=k34 )35 register_response = await kinesis_client.register_stream_consumer(36 StreamARN=stream_arn, ConsumerName=consumer_name37 )38 consumer_arn = register_response['Consumer']['ConsumerARN']39 while (40 describe_response := (41 await kinesis_client.describe_stream_consumer( # noqa: E231, E999, E251, E50142 StreamARN=stream_arn,43 ConsumerName=consumer_name,44 ConsumerARN=consumer_arn,45 )46 )47 ) and describe_response['ConsumerDescription'][48 'ConsumerStatus'49 ] == 'CREATING':50 print("Waiting for stream consumer creation")51 await asyncio.sleep(1)52 starting_position = {'Type': 'LATEST'}53 subscribe_response = await kinesis_client.subscribe_to_shard(54 ConsumerARN=consumer_arn,55 ShardId=shard_id,56 StartingPosition=starting_position,57 )58 async for event in subscribe_response['EventStream']:59 assert event['SubscribeToShardEvent']['Records'] == []60 break61 finally:62 if consumer_arn:63 await kinesis_client.deregister_stream_consumer(64 StreamARN=stream_arn,65 ConsumerName=consumer_name,66 ConsumerARN=consumer_arn,67 )...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful