How to use generate_processor_script method in localstack

Best Python code snippet using localstack_python

kinesis_connector.py

Source:kinesis_connector.py Github

copy

Full Screen

...286 stream = KinesisStream(id=stream_name, params=stream_info)287 thread_consumer = KinesisProcessorThread.start_consumer(stream)288 TMP_THREADS.append(thread_consumer)289 return thread_consumer290def generate_processor_script(events_file, log_file=None):291 script_file = os.path.join(tempfile.gettempdir(), 'kclipy.%s.processor.py' % short_uid())292 if log_file:293 log_file = "'%s'" % log_file294 else:295 log_file = 'None'296 content = """#!/usr/bin/env python297import os, sys, glob, json, socket, time, logging, tempfile298import subprocess32 as subprocess299logging.basicConfig(level=logging.INFO)300for path in glob.glob('%s/lib/python*/site-packages'):301 sys.path.insert(0, path)302sys.path.insert(0, '%s')303from localstack.config import DEFAULT_ENCODING304from localstack.utils.kinesis import kinesis_connector305from localstack.utils.common import timestamp306events_file = '%s'307log_file = %s308error_log = os.path.join(tempfile.gettempdir(), 'kclipy.error.log')309if __name__ == '__main__':310 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)311 num_tries = 3312 sleep_time = 2313 error = None314 for i in range(0, num_tries):315 try:316 sock.connect(events_file)317 error = None318 break319 except Exception as e:320 error = e321 if i < num_tries:322 msg = '%%s: Unable to connect to UNIX socket. Retrying.' %% timestamp()323 subprocess.check_output('echo "%%s" >> %%s' %% (msg, error_log), shell=True)324 time.sleep(sleep_time)325 if error:326 print("WARN: Unable to connect to UNIX socket after retrying: %%s" %% error)327 raise error328 def receive_msg(records, checkpointer, shard_id):329 try:330 # records is a list of amazon_kclpy.messages.Record objects -> convert to JSON331 records_dicts = [j._json_dict for j in records]332 message_to_send = {'shard_id': shard_id, 'records': records_dicts}333 string_to_send = '%%s\\n' %% json.dumps(message_to_send)334 bytes_to_send = string_to_send.encode(DEFAULT_ENCODING)335 sock.send(bytes_to_send)336 except Exception as e:337 msg = "WARN: Unable to forward event: %%s" %% e338 print(msg)339 subprocess.check_output('echo "%%s" >> %%s' %% (msg, error_log), shell=True)340 kinesis_connector.KinesisProcessor.run_processor(log_file=log_file, processor_func=receive_msg)341 """ % (LOCALSTACK_VENV_FOLDER, LOCALSTACK_ROOT_FOLDER, events_file, log_file)342 save_file(script_file, content)343 chmod_r(script_file, 0o755)344 TMP_FILES.append(script_file)345 return script_file346def listen_to_kinesis(stream_name, listener_func=None, processor_script=None,347 events_file=None, endpoint_url=None, log_file=None, configs={}, env=None,348 ddb_lease_table_suffix=None, env_vars={}, kcl_log_level=DEFAULT_KCL_LOG_LEVEL,349 log_subscribers=[], wait_until_started=False, fh_d_stream=None):350 """351 High-level function that allows to subscribe to a Kinesis stream352 and receive events in a listener function. A KCL client process is353 automatically started in the background.354 """355 env = aws_stack.get_environment(env)356 if not events_file:357 events_file = EVENTS_FILE_PATTERN.replace('*', short_uid())358 TMP_FILES.append(events_file)359 if not processor_script:360 processor_script = generate_processor_script(events_file, log_file=log_file)361 run('rm -f %s' % events_file)362 # start event reader thread (this process)363 ready_mutex = threading.Semaphore(0)364 thread = EventFileReaderThread(events_file, listener_func, ready_mutex=ready_mutex, fh_d_stream=fh_d_stream)365 thread.start()366 # Wait until the event reader thread is ready (to avoid 'Connection refused' error on the UNIX socket)367 ready_mutex.acquire()368 # start KCL client (background process)369 if processor_script[-4:] == '.pyc':370 processor_script = processor_script[0:-1]371 # add log listener that notifies when KCL is started372 if wait_until_started:373 listener = KclStartedLogListener()374 log_subscribers.append(listener)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful