How to use start_kcl_client_process method in localstack

Best Python code snippet using localstack_python

kinesis_connector.py

Source:kinesis_connector.py Github

copy

Full Screen

...230 stream_info['conn_kwargs']['host'] = url.hostname231 stream_info['conn_kwargs']['port'] = url.port232 stream_info['conn_kwargs']['is_secure'] = url.scheme == 'https'233 return stream_info234def start_kcl_client_process(stream_name, listener_script, log_file=None, env=None, configs={},235 endpoint_url=None, ddb_lease_table_suffix=None, env_vars={},236 kcl_log_level=DEFAULT_KCL_LOG_LEVEL, log_subscribers=[]):237 env = aws_stack.get_environment(env)238 # decide which credentials provider to use239 credentialsProvider = None240 if (('AWS_ASSUME_ROLE_ARN' in os.environ or 'AWS_ASSUME_ROLE_ARN' in env_vars) and241 ('AWS_ASSUME_ROLE_SESSION_NAME' in os.environ or 'AWS_ASSUME_ROLE_SESSION_NAME' in env_vars)):242 # use special credentials provider that can assume IAM roles and handle temporary STS auth tokens243 credentialsProvider = 'com.atlassian.DefaultSTSAssumeRoleSessionCredentialsProvider'244 # pass through env variables to child process245 for var_name in ['AWS_ASSUME_ROLE_ARN', 'AWS_ASSUME_ROLE_SESSION_NAME',246 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', 'AWS_SESSION_TOKEN']:247 if var_name in os.environ and var_name not in env_vars:248 env_vars[var_name] = os.environ[var_name]249 if env.region == REGION_LOCAL:250 # need to disable CBOR protocol, enforce use of plain JSON,251 # see https://github.com/mhart/kinesalite/issues/31252 env_vars['AWS_CBOR_DISABLE'] = 'true'253 if kcl_log_level or (len(log_subscribers) > 0):254 if not log_file:255 log_file = LOG_FILE_PATTERN.replace('*', short_uid())256 TMP_FILES.append(log_file)257 run('touch %s' % log_file)258 # start log output reader thread which will read the KCL log259 # file and print each line to stdout of this process...260 reader_thread = OutputReaderThread({'file': log_file, 'level': kcl_log_level,261 'log_prefix': 'KCL', 'log_subscribers': log_subscribers})262 reader_thread.start()263 # construct stream info264 stream_info = get_stream_info(stream_name, log_file, env=env, endpoint_url=endpoint_url,265 ddb_lease_table_suffix=ddb_lease_table_suffix, env_vars=env_vars)266 props_file = stream_info['properties_file']267 # set kcl config options268 kwargs = {269 'metricsLevel': 'NONE',270 'initialPositionInStream': 'LATEST'271 }272 # set parameters for local connection273 if env.region == REGION_LOCAL:274 kwargs['kinesisEndpoint'] = '%s:%s' % (HOSTNAME, config.PORT_KINESIS)275 kwargs['dynamodbEndpoint'] = '%s:%s' % (HOSTNAME, config.PORT_DYNAMODB)276 kwargs['kinesisProtocol'] = 'http%s' % ('s' if USE_SSL else '')277 kwargs['dynamodbProtocol'] = 'http%s' % ('s' if USE_SSL else '')278 kwargs['disableCertChecking'] = 'true'279 kwargs.update(configs)280 # create config file281 kclipy_helper.create_config_file(config_file=props_file, executableName=listener_script,282 streamName=stream_name, applicationName=stream_info['app_name'],283 credentialsProvider=credentialsProvider, **kwargs)284 TMP_FILES.append(props_file)285 # start stream consumer286 stream = KinesisStream(id=stream_name, params=stream_info)287 thread_consumer = KinesisProcessorThread.start_consumer(stream)288 TMP_THREADS.append(thread_consumer)289 return thread_consumer290def generate_processor_script(events_file, log_file=None):291 script_file = os.path.join(tempfile.gettempdir(), 'kclipy.%s.processor.py' % short_uid())292 if log_file:293 log_file = "'%s'" % log_file294 else:295 log_file = 'None'296 content = """#!/usr/bin/env python297import os, sys, glob, json, socket, time, logging, tempfile298import subprocess32 as subprocess299logging.basicConfig(level=logging.INFO)300for path in glob.glob('%s/lib/python*/site-packages'):301 sys.path.insert(0, path)302sys.path.insert(0, '%s')303from localstack.config import DEFAULT_ENCODING304from localstack.utils.kinesis import kinesis_connector305from localstack.utils.common import timestamp306events_file = '%s'307log_file = %s308error_log = os.path.join(tempfile.gettempdir(), 'kclipy.error.log')309if __name__ == '__main__':310 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)311 num_tries = 3312 sleep_time = 2313 error = None314 for i in range(0, num_tries):315 try:316 sock.connect(events_file)317 error = None318 break319 except Exception as e:320 error = e321 if i < num_tries:322 msg = '%%s: Unable to connect to UNIX socket. Retrying.' %% timestamp()323 subprocess.check_output('echo "%%s" >> %%s' %% (msg, error_log), shell=True)324 time.sleep(sleep_time)325 if error:326 print("WARN: Unable to connect to UNIX socket after retrying: %%s" %% error)327 raise error328 def receive_msg(records, checkpointer, shard_id):329 try:330 # records is a list of amazon_kclpy.messages.Record objects -> convert to JSON331 records_dicts = [j._json_dict for j in records]332 message_to_send = {'shard_id': shard_id, 'records': records_dicts}333 string_to_send = '%%s\\n' %% json.dumps(message_to_send)334 bytes_to_send = string_to_send.encode(DEFAULT_ENCODING)335 sock.send(bytes_to_send)336 except Exception as e:337 msg = "WARN: Unable to forward event: %%s" %% e338 print(msg)339 subprocess.check_output('echo "%%s" >> %%s' %% (msg, error_log), shell=True)340 kinesis_connector.KinesisProcessor.run_processor(log_file=log_file, processor_func=receive_msg)341 """ % (LOCALSTACK_VENV_FOLDER, LOCALSTACK_ROOT_FOLDER, events_file, log_file)342 save_file(script_file, content)343 chmod_r(script_file, 0o755)344 TMP_FILES.append(script_file)345 return script_file346def listen_to_kinesis(stream_name, listener_func=None, processor_script=None,347 events_file=None, endpoint_url=None, log_file=None, configs={}, env=None,348 ddb_lease_table_suffix=None, env_vars={}, kcl_log_level=DEFAULT_KCL_LOG_LEVEL,349 log_subscribers=[], wait_until_started=False, fh_d_stream=None):350 """351 High-level function that allows to subscribe to a Kinesis stream352 and receive events in a listener function. A KCL client process is353 automatically started in the background.354 """355 env = aws_stack.get_environment(env)356 if not events_file:357 events_file = EVENTS_FILE_PATTERN.replace('*', short_uid())358 TMP_FILES.append(events_file)359 if not processor_script:360 processor_script = generate_processor_script(events_file, log_file=log_file)361 run('rm -f %s' % events_file)362 # start event reader thread (this process)363 ready_mutex = threading.Semaphore(0)364 thread = EventFileReaderThread(events_file, listener_func, ready_mutex=ready_mutex, fh_d_stream=fh_d_stream)365 thread.start()366 # Wait until the event reader thread is ready (to avoid 'Connection refused' error on the UNIX socket)367 ready_mutex.acquire()368 # start KCL client (background process)369 if processor_script[-4:] == '.pyc':370 processor_script = processor_script[0:-1]371 # add log listener that notifies when KCL is started372 if wait_until_started:373 listener = KclStartedLogListener()374 log_subscribers.append(listener)375 process = start_kcl_client_process(stream_name, processor_script,376 endpoint_url=endpoint_url, log_file=log_file, configs=configs, env=env,377 ddb_lease_table_suffix=ddb_lease_table_suffix, env_vars=env_vars, kcl_log_level=kcl_log_level,378 log_subscribers=log_subscribers)379 if wait_until_started:380 # Wait at most 90 seconds for initialization. Note that creating the DDB table can take quite a bit381 try:382 listener.sync_init.get(block=True, timeout=90)383 except Exception:384 raise Exception('Timeout when waiting for KCL initialization.')385 # wait at most 30 seconds for shard lease notification386 try:387 listener.sync_take_shard.get(block=True, timeout=30)388 except Exception:389 # this merely means that there is no shard available to take. Do nothing....

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful