Best Python code snippet using localstack_python
tablestore_streamreader_console.py
Source:tablestore_streamreader_console.py  
...15        self.accesskey = str(config['accessKey'])16        self.instance_name = str(config['instanceName'])17        self.status_table = str(config['statusTable'])18        self.ots = OTSClient(self.endpoint, self.accessid, self.accesskey, self.instance_name)19def describe_job(config, options):20    '''21        1. get job's description22        2. get all job's checkpoints and check if it is done23    '''24    if not options.stream_id:25        print "Error: Should set the stream id using '-s' or '--streamid'."26        sys.exit(-1)27    if not options.timestamp:28        print "Error: Should set the timestamp using '-t' or '--timestamp'."29        sys.exit(-1)30    pk = [('StreamId', options.stream_id), ('StatusType', 'DataxJobDesc'), ('StatusValue', '%16d' % int(options.timestamp))]31    consumed, pk, attrs, next_token = config.ots.get_row(config.status_table, pk, [], None, 1)32    if not attrs:33        print 'Stream job is not found.'34        sys.exit(-1)35    job_detail = parse_job_detail(attrs)36    print '----------JobDescriptions----------'37    print json.dumps(job_detail, indent=2)38    print '-----------------------------------'39    stream_checkpoints = _list_checkpoints(config, options.stream_id, int(options.timestamp))40    cps_headers = ['ShardId', 'SendRecordCount', 'Checkpoint', 'SkipCount', 'Version']41    table_content = []42    for cp in stream_checkpoints:43        table_content.append([cp['ShardId'], cp['SendRecordCount'], cp['Checkpoint'], cp['SkipCount'], cp['Version']])44    print tabulate.tabulate(table_content, headers=cps_headers)45    # check if stream job has finished46    finished = True47    if len(job_detail['ShardIds']) != len(stream_checkpoints):48        finished = False49    for cp in stream_checkpoints:50        if cp['Version'] != job_detail['Version']:51            finished = False52    print '----------JobSummary----------'53    print 'ShardsCount:', len(job_detail['ShardIds'])54    print 'CheckPointsCount:', len(stream_checkpoints)55    print 'JobStatus:', 'Finished' if finished else 'NotFinished'56    print '------------------------------'57def _list_checkpoints(config, stream_id, timestamp):58    start_pk = [('StreamId', stream_id), ('StatusType', 'CheckpointForDataxReader'), ('StatusValue', '%16d' % timestamp)]59    end_pk = [('StreamId', stream_id), ('StatusType', 'CheckpointForDataxReader'), ('StatusValue', '%16d' % (timestamp + 1))]60    consumed_counter = CapacityUnit(0, 0)61    columns_to_get = []62    checkpoints = []63    range_iter = config.ots.xget_range(64                config.status_table, Direction.FORWARD,65                start_pk, end_pk,66                consumed_counter, columns_to_get, 100,67                column_filter=None, max_version=168    )69    rows = []70    for (primary_key, attrs) in range_iter:71        checkpoint = {}72        for attr in attrs:73            checkpoint[attr[0]] = attr[1]74        if not checkpoint.has_key('SendRecordCount'):75            checkpoint['SendRecordCount'] = 076        checkpoint['ShardId'] = primary_key[2][1].split('\t')[1]77        checkpoints.append(checkpoint)78    return checkpoints79def list_job(config, options):80    '''81        Two options:82            1. list all jobs of stream83            2. list all jobs and all streams84    '''85    consumed_counter = CapacityUnit(0, 0)86    if options.stream_id:87        start_pk = [('StreamId', options.stream_id), ('StatusType', INF_MIN), ('StatusValue', INF_MIN)]88        end_pk = [('StreamId', options.stream_id), ('StatusType', INF_MAX), ('StatusValue', INF_MAX)]89    else:90        start_pk = [('StreamId', INF_MIN), ('StatusType', INF_MIN), ('StatusValue', INF_MIN)]91        end_pk = [('StreamId', INF_MAX), ('StatusType', INF_MAX), ('StatusValue', INF_MAX)]92    columns_to_get = []93    range_iter = config.ots.xget_range(94                config.status_table, Direction.FORWARD,95                start_pk, end_pk,96                consumed_counter, columns_to_get, None,97                column_filter=None, max_version=198    )99    rows = []100    for (primary_key, attrs) in range_iter:101        if primary_key[1][1] == 'DataxJobDesc':102            job_detail = parse_job_detail(attrs)103            rows.append([job_detail['TableName'], job_detail['JobStreamId'], job_detail['EndTime'], job_detail['StartTime'], job_detail['EndTime'], job_detail['Version']])104    headers = ['TableName', 'JobStreamId', 'Timestamp', 'StartTime', 'EndTime', 'Version']105    print tabulate.tabulate(rows, headers=headers)106def parse_job_detail(attrs):107    job_details = {}108    shard_ids_content = ''109    for attr in attrs:110        if attr[0].startswith('ShardIds_'):111            shard_ids_content += attr[1]112        else:113            job_details[attr[0]] = attr[1]114    shard_ids = json.loads(zlib.decompress(shard_ids_content))115    if not job_details.has_key('Version'):116        job_details['Version'] = ''117    if not job_details.has_key('SkipCount'):118        job_details['SkipCount'] = 0119    job_details['ShardIds'] = shard_ids120    return job_details121def parse_time(value):122    try:123        return int(value)124    except Exception,e:125        return int(time.mktime(time.strptime(value, '%Y-%m-%d %H:%M:%S')))126if __name__ == '__main__':127    parser = OptionParser()128    parser.add_option('-c', '--config', dest='config_file', help='path of config file', metavar='tablestore_streamreader_config.json')129    parser.add_option('-a', '--action', dest='action', help='the action to do', choices = ['describe_job', 'list_job'], metavar='')130    parser.add_option('-t', '--timestamp', dest='timestamp', help='the timestamp', metavar='')131    parser.add_option('-s', '--streamid', dest='stream_id', help='the id of stream', metavar='')132    parser.add_option('-d', '--shardid', dest='shard_id', help='the id of shard', metavar='')133    options, args = parser.parse_args()134    if not options.config_file:135        print "Error: Should set the path of config file using '-c' or '--config'."136        sys.exit(-1)137    if not options.action:138        print "Error: Should set the action using '-a' or '--action'."139        sys.exit(-1)140    console_config = ConsoleConfig(options.config_file)141    if options.action == 'list_job':142        list_job(console_config, options)143    elif options.action == 'describe_job':...describe_job.py
Source:describe_job.py  
1import logging2import boto33from botocore.exceptions import ClientError4def describe_job(vault_name, job_id):5    """Retrieve the status of an Amazon S3 Glacier job, such as an6    inventory-retrieval job7    To retrieve the output of the finished job, call Glacier.Client.get_job_output()8    :param vault_name: string9    :param job_id: string. The job ID was returned by Glacier.Client.initiate_job().10    :return: Dictionary of information related to the job. If error, return None.11    """12    # Retrieve the status of the job13    glacier = boto3.client('glacier')14    try:15        response = glacier.describe_job(vaultName=vault_name, jobId=job_id)16    except ClientError as e:17        logging.error(e)18        return None19    return response20def main():21    """Exercise describe_job()"""22    # Assign the following values before running the program23    test_vault_name = 'VAULT_NAME'24    test_job_id = 'JOB_ID'25    # Set up logging26    logging.basicConfig(level=logging.DEBUG,27                        format='%(levelname)s: %(asctime)s: %(message)s')28    # Retrieve the job's status29    response = describe_job(test_vault_name, test_job_id)30    if response is not None:31        logging.info(f'Job Type: {response["Action"]}, '32                     f'Status: {response["StatusCode"]}')33if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
