Best Python code snippet using localstack_python
kinesis_connector.py
Source:kinesis_connector.py  
...230        stream_info['conn_kwargs']['host'] = url.hostname231        stream_info['conn_kwargs']['port'] = url.port232        stream_info['conn_kwargs']['is_secure'] = url.scheme == 'https'233    return stream_info234def start_kcl_client_process(stream_name, listener_script, log_file=None, env=None, configs={},235        endpoint_url=None, ddb_lease_table_suffix=None, env_vars={},236        kcl_log_level=DEFAULT_KCL_LOG_LEVEL, log_subscribers=[]):237    env = aws_stack.get_environment(env)238    # decide which credentials provider to use239    credentialsProvider = None240    if (('AWS_ASSUME_ROLE_ARN' in os.environ or 'AWS_ASSUME_ROLE_ARN' in env_vars) and241            ('AWS_ASSUME_ROLE_SESSION_NAME' in os.environ or 'AWS_ASSUME_ROLE_SESSION_NAME' in env_vars)):242        # use special credentials provider that can assume IAM roles and handle temporary STS auth tokens243        credentialsProvider = 'com.atlassian.DefaultSTSAssumeRoleSessionCredentialsProvider'244        # pass through env variables to child process245        for var_name in ['AWS_ASSUME_ROLE_ARN', 'AWS_ASSUME_ROLE_SESSION_NAME',246                'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', 'AWS_SESSION_TOKEN']:247            if var_name in os.environ and var_name not in env_vars:248                env_vars[var_name] = os.environ[var_name]249    if env.region == REGION_LOCAL:250        # need to disable CBOR protocol, enforce use of plain JSON,251        # see https://github.com/mhart/kinesalite/issues/31252        env_vars['AWS_CBOR_DISABLE'] = 'true'253    if kcl_log_level or (len(log_subscribers) > 0):254        if not log_file:255            log_file = LOG_FILE_PATTERN.replace('*', short_uid())256            TMP_FILES.append(log_file)257        run('touch %s' % log_file)258        # start log output reader thread which will read the KCL log259        # file and print each line to stdout of this process...260        reader_thread = OutputReaderThread({'file': log_file, 'level': kcl_log_level,261            'log_prefix': 'KCL', 'log_subscribers': log_subscribers})262        reader_thread.start()263    # construct stream info264    stream_info = get_stream_info(stream_name, log_file, env=env, endpoint_url=endpoint_url,265        ddb_lease_table_suffix=ddb_lease_table_suffix, env_vars=env_vars)266    props_file = stream_info['properties_file']267    # set kcl config options268    kwargs = {269        'metricsLevel': 'NONE',270        'initialPositionInStream': 'LATEST'271    }272    # set parameters for local connection273    if env.region == REGION_LOCAL:274        kwargs['kinesisEndpoint'] = '%s:%s' % (HOSTNAME, config.PORT_KINESIS)275        kwargs['dynamodbEndpoint'] = '%s:%s' % (HOSTNAME, config.PORT_DYNAMODB)276        kwargs['kinesisProtocol'] = 'http%s' % ('s' if USE_SSL else '')277        kwargs['dynamodbProtocol'] = 'http%s' % ('s' if USE_SSL else '')278        kwargs['disableCertChecking'] = 'true'279    kwargs.update(configs)280    # create config file281    kclipy_helper.create_config_file(config_file=props_file, executableName=listener_script,282        streamName=stream_name, applicationName=stream_info['app_name'],283        credentialsProvider=credentialsProvider, **kwargs)284    TMP_FILES.append(props_file)285    # start stream consumer286    stream = KinesisStream(id=stream_name, params=stream_info)287    thread_consumer = KinesisProcessorThread.start_consumer(stream)288    TMP_THREADS.append(thread_consumer)289    return thread_consumer290def generate_processor_script(events_file, log_file=None):291    script_file = os.path.join(tempfile.gettempdir(), 'kclipy.%s.processor.py' % short_uid())292    if log_file:293        log_file = "'%s'" % log_file294    else:295        log_file = 'None'296    content = """#!/usr/bin/env python297import os, sys, glob, json, socket, time, logging, tempfile298import subprocess32 as subprocess299logging.basicConfig(level=logging.INFO)300for path in glob.glob('%s/lib/python*/site-packages'):301    sys.path.insert(0, path)302sys.path.insert(0, '%s')303from localstack.config import DEFAULT_ENCODING304from localstack.utils.kinesis import kinesis_connector305from localstack.utils.common import timestamp306events_file = '%s'307log_file = %s308error_log = os.path.join(tempfile.gettempdir(), 'kclipy.error.log')309if __name__ == '__main__':310    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)311    num_tries = 3312    sleep_time = 2313    error = None314    for i in range(0, num_tries):315        try:316            sock.connect(events_file)317            error = None318            break319        except Exception as e:320            error = e321            if i < num_tries:322                msg = '%%s: Unable to connect to UNIX socket. Retrying.' %% timestamp()323                subprocess.check_output('echo "%%s" >> %%s' %% (msg, error_log), shell=True)324                time.sleep(sleep_time)325    if error:326        print("WARN: Unable to connect to UNIX socket after retrying: %%s" %% error)327        raise error328    def receive_msg(records, checkpointer, shard_id):329        try:330            # records is a list of amazon_kclpy.messages.Record objects -> convert to JSON331            records_dicts = [j._json_dict for j in records]332            message_to_send = {'shard_id': shard_id, 'records': records_dicts}333            string_to_send = '%%s\\n' %% json.dumps(message_to_send)334            bytes_to_send = string_to_send.encode(DEFAULT_ENCODING)335            sock.send(bytes_to_send)336        except Exception as e:337            msg = "WARN: Unable to forward event: %%s" %% e338            print(msg)339            subprocess.check_output('echo "%%s" >> %%s' %% (msg, error_log), shell=True)340    kinesis_connector.KinesisProcessor.run_processor(log_file=log_file, processor_func=receive_msg)341    """ % (LOCALSTACK_VENV_FOLDER, LOCALSTACK_ROOT_FOLDER, events_file, log_file)342    save_file(script_file, content)343    chmod_r(script_file, 0o755)344    TMP_FILES.append(script_file)345    return script_file346def listen_to_kinesis(stream_name, listener_func=None, processor_script=None,347        events_file=None, endpoint_url=None, log_file=None, configs={}, env=None,348        ddb_lease_table_suffix=None, env_vars={}, kcl_log_level=DEFAULT_KCL_LOG_LEVEL,349        log_subscribers=[], wait_until_started=False, fh_d_stream=None):350    """351    High-level function that allows to subscribe to a Kinesis stream352    and receive events in a listener function. A KCL client process is353    automatically started in the background.354    """355    env = aws_stack.get_environment(env)356    if not events_file:357        events_file = EVENTS_FILE_PATTERN.replace('*', short_uid())358        TMP_FILES.append(events_file)359    if not processor_script:360        processor_script = generate_processor_script(events_file, log_file=log_file)361    run('rm -f %s' % events_file)362    # start event reader thread (this process)363    ready_mutex = threading.Semaphore(0)364    thread = EventFileReaderThread(events_file, listener_func, ready_mutex=ready_mutex, fh_d_stream=fh_d_stream)365    thread.start()366    # Wait until the event reader thread is ready (to avoid 'Connection refused' error on the UNIX socket)367    ready_mutex.acquire()368    # start KCL client (background process)369    if processor_script[-4:] == '.pyc':370        processor_script = processor_script[0:-1]371    # add log listener that notifies when KCL is started372    if wait_until_started:373        listener = KclStartedLogListener()374        log_subscribers.append(listener)375    process = start_kcl_client_process(stream_name, processor_script,376        endpoint_url=endpoint_url, log_file=log_file, configs=configs, env=env,377        ddb_lease_table_suffix=ddb_lease_table_suffix, env_vars=env_vars, kcl_log_level=kcl_log_level,378        log_subscribers=log_subscribers)379    if wait_until_started:380        # Wait at most 90 seconds for initialization. Note that creating the DDB table can take quite a bit381        try:382            listener.sync_init.get(block=True, timeout=90)383        except Exception:384            raise Exception('Timeout when waiting for KCL initialization.')385        # wait at most 30 seconds for shard lease notification386        try:387            listener.sync_take_shard.get(block=True, timeout=30)388        except Exception:389            # this merely means that there is no shard available to take. Do nothing....Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
