Best Python code snippet using slash
ecs.py
Source:ecs.py  
...3import hashlib4import time5from aws_ecs_remote.cloudwatch import follow_log_events6import warnings7def get_log_paths(containers, container_definitions):8    cdefs = {9        cdef['name']: cdef10        for cdef11        in container_definitions12    }13    ldefs = []14    for container in containers:15        name = container['name']16        cdef = cdefs[name]17        driver = cdef['logConfiguration']['logDriver']18        if driver == 'awslogs':19            log_group = cdef['logConfiguration']['options']['awslogs-group']20            log_prefix = cdef['logConfiguration']['options']['awslogs-stream-prefix']21            taskId = container['taskArn'].split('/')[-1]22            ldefs.append({23                'name': name,24                'group': log_group,25                'prefix': log_prefix,26                'taskId': taskId,27                'stream': '{}/{}/{}'.format(log_prefix, name, taskId)28            })29        else:30            warnings.warn("Container [{}] uses log driver [{}] but should be using [awslogs]".format(31                name, driver32            ))33    return ldefs34def tasks_waiter(ecs, cluster, task_arns, poll_sec=6):35    def waiter():36        done = tasks_stopped(ecs=ecs, cluster=cluster, task_arns=task_arns)37        if not done:38            time.sleep(poll_sec)39        return done40    return waiter41def tasks_stopped(ecs, cluster, task_arns):42    response = ecs.describe_tasks(43        cluster=cluster,44        tasks=task_arns45    )46    tasks = response['tasks']47    for task in tasks:48        if task['lastStatus'] != 'STOPPED':49            return False50    return True51def ecs_run_task(52        ecs, cluster, task_definition, command, subnets, security_groups, launch_type=LaunchType.FARGATE,53        platform_version='1.4.0', container_name=CONTAINER_NAME,54        assign_public_ip='ENABLED',55        started_by='aws-ecs-remote', group='aws-ecs-remote-group'):56    response = ecs.run_task(57        cluster=cluster,58        taskDefinition=task_definition,59        overrides={60            'containerOverrides': [61                {62                    'name': container_name,63                    'command': command64                },65            ]66        },67        count=1,68        startedBy=started_by,69        group=group,70        launchType=launch_type,71        platformVersion=platform_version,72        networkConfiguration={73            'awsvpcConfiguration': {74                'subnets': subnets,75                'securityGroups': security_groups,76                'assignPublicIp': assign_public_ip77            }78        })79    tasks = response['tasks']80    assert len(tasks) == 181    task = tasks[0]82    return task83if __name__ == "__main__":84    import boto385    session = boto3.Session()86    ecs = session.client('ecs')87    cluster = 'arn:aws:ecs:us-east-1:683880991063:cluster/columbo-fargate-cluster'88    task = ecs_run_task(89        ecs=ecs,90        cluster=cluster,91        task_definition='fargate-task-definition',92        command=['echo test aws-ecs-remote'],93        subnets=['subnet-0c48b4dd667d2867e'],94        security_groups=['sg-08b550b061776586c'],95        launch_type=LaunchType.FARGATE,96        platform_version='1.4.0',97        assign_public_ip='ENABLED',98        started_by='aws-ecs-remote',99        group='aws-ecs-remote-group')100    task_definition = describe_task_definition(101        ecs=ecs,102        task_definition=task['taskDefinitionArn']103    )104    log_streams = get_log_paths(105        task['containers'], task_definition['containerDefinitions']106    )107    waiter = tasks_waiter(108        ecs=ecs,109        cluster=task['clusterArn'],110        task_arns=[task['taskArn']]111    )112    logs = session.client('logs')113    follow_log_events(114        logs=logs, log_streams=log_streams, waiter=waiter...plot.py
Source:plot.py  
...10  with open(filename) as f:11    for line in f:12      rv.append(line.strip())13  return rv14def get_log_paths(dirname):15  files = get_files(dirname)16  rv = []17  for filename in files:18    path = os.path.join(dirname, filename)19    if path.startswith(dirname + '/' + 'asan0'):20      rv.append(path)21  return rv22def log_path_to_time(path):23  return path.split('-')[-1].split('.')[-2]24def get_log_paths_times(paths):25  rv = {}26  for path in paths:27    rv[path] = int(log_path_to_time(path))28  return rv29def normalize_log_paths_times(paths_times):30  m = min(paths_times.items(), key=lambda x: x[1])[1]31  for key in paths_times.keys():32    paths_times[key] -= m33  return paths_times34def log_line_to_time(line):35  rv = None36  try:37    rv = int(line.split(']')[0].lstrip('[ ').split('.')[0])38  except:39    return None40  return rv41def get_race_time(log_line, races):42  for race in races:43    if race in log_line:44      time = log_line_to_time(log_line)45      if time != None:46	return time47  return None48def get_races_times(log_path, races):49  rv = []50  with open(log_path) as f:51    for line in f:52      line = line.strip()53      time = get_race_time(line, races)54      if time != None:55        rv.append(time)56  return rv57def get_all_races_times(dirname, races):58  log_paths = get_log_paths(dirname)59  log_paths_times = normalize_log_paths_times(get_log_paths_times(log_paths))60  all_times = []61  for path in log_paths:62    times = get_races_times(path, races)63    times = [x + log_paths_times[path] for x in times]64    all_times += times65  all_times.sort()66  return all_times67def generate_step_plot_ranges(steps):68  x_max = max(steps) + 169  x = range(x_max)70  y = []71  v = 072  for i in xrange(x_max):...core.py
Source:core.py  
...12        self.runno = runno13        self.init_from_config()14        if self.sessionPath is None:15            raise FileNotFoundError('Session directory {0} does not exist!'.format(self.sessionPath))16        self.get_log_paths()17        self.data = pjoin(self.savePath,'sessionData.csv').replace("\\",os.sep)18    def __repr__(self) -> str:19        kws = [f'{key}={value!r}' for key, value in self.__dict__.items()]20        return '{}({})'.format(type(self).__name__, ', '.join(kws))21    def init_from_config(self) -> None:22        self.config = getConfig()23        for k, v in self.config.items():24            session_path = pjoin(v,self.sessiondir).replace("\\",os.sep)25            setattr(self, k, session_path)26            # get where the raw data is either in training or presentation27            if 'presentation' in k or 'training' in k:28                if os.path.exists(session_path):29                    display(f'Found session rawdata at {v}')30                    self.sessionPath = session_path31            if 'analysis' in k:32                self.savePath = session_path33    def get_log_paths(self) -> None:34        for s_file in os.listdir(self.sessionPath):35            extension = os.path.splitext(s_file)[1]36            temp_key = extension.split('.')[-1]37            log_path = pjoin(self.sessionPath,s_file).replace("\\",os.sep)38            setattr(self, temp_key, log_path)39            if self.runno is not None:40                if self.runno in s_file:41                    log_path = pjoin(self.sessionPath,s_file).replace("\\","/")42                    setattr(self, temp_key, log_path)43            else:44                log_path = pjoin(self.sessionPath,s_file).replace("\\","/")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
