Best Python code snippet using localstack_python
jobs.py
Source:jobs.py  
...70        except botocore.exceptions.ClientError as error:71            if error.response["Error"]["Code"] != "NoSuchKey":72                raise error73        self.state = {key: Job.parse_obj(val) for key, val in read_state.items()}74    def _persist_state(self) -> None:75        """Persist the jobs state to S3"""76        state_dict = {key: val.dict() for key, val in self.state.items()}77        serialized = bytes(json.dumps(state_dict, cls=DateTimeEncoder), "utf-8")78        self.s3_client.put_object(Bucket=BUCKET_NAME, Key=JOBS_STATE_FILE, Body=serialized)79    def _generate_id(self) -> str:80        """Generate a unique Job ID"""81        i = 082        while i < MAX_TRIES:83            job_id = "".join(random.choices(string.ascii_lowercase, k=4))84            if job_id not in self.state:85                return job_id86            i += 187        raise Exception("Unable to generate ID.")88    def _create_batch_jobs(self, job_id: str, start_frame: int, end_frame: int, gpu: bool) -> Dict[int, BatchJob]:89        """Create batch jobs for a render job"""90        batch_jobs = {}91        # Pick job definition and queue92        job_def, job_queue = JOB_DEF_CPU, JOB_QUEUE_CPU93        if gpu:94            job_def, job_queue = JOB_DEF_GPU, JOB_QUEUE_GPU95        # Create a batch job for each frame96        for frame_num in range(start_frame, end_frame + 1):97            name = f"render-job-{job_id}-frame-{frame_num}"98            params = dict(frame=str(frame_num), job=job_id)99            retry_strategy = dict(100                attempts=3,101                evaluateOnExit=[102                    dict(onStatusReason="Host EC2*", action="RETRY"),103                    dict(onReason="*", action="EXIT"),104                ],105            )106            # Make request107            response = self.batch_client.submit_job(108                jobName=name,109                jobQueue=job_queue,110                jobDefinition=job_def,111                parameters=params,112                retryStrategy=retry_strategy,113            )114            # Create pydantic model115            batch_job = BatchJob(batch_id=response["jobId"], name=response["jobName"], frame=frame_num)116            batch_jobs[frame_num] = batch_job117            typer.echo(f"Created batch job {batch_job.name}")118        return batch_jobs119    def create_job(self, blend_path: str, start_frame: int, end_frame: int, gpu: bool = False) -> Job:120        """121        Create a new job. That process composes of several steps:122        1. Generate a unique ID123        2. Upload blend file to S3124        3. Create AWS Batch jobs125        :param blend_path: Path to the blend file to upload.126        """127        # Reload state128        self._load_state()129        # Generate ID130        typer.echo("Generating unique ID...")131        job_id = self._generate_id()132        # Upload blend file to S3133        typer.echo("Uploading blend file to S3...")134        obj_key = f"jobs/{job_id}/main.blend"135        self.s3_client.upload_file(Filename=blend_path, Bucket=BUCKET_NAME, Key=obj_key)136        # Create AWS Batch jobs137        typer.echo("Creating batch jobs...")138        batch_jobs = self._create_batch_jobs(job_id, start_frame, end_frame, gpu)139        # Create pydantic model140        job = Job(141            job_id=job_id,142            creation_date=datetime.now(),143            children=batch_jobs,144            start_frame=start_frame,145            end_frame=end_frame,146            gpu=gpu,147            file_name=Path(blend_path).name,148            status=STATUS_RUNNING,149        )150        self.state[job.job_id] = job151        # Persist state152        self._persist_state()153        typer.echo(f"Created render job with id: {job.job_id}")154        return job155    def list_jobs(self) -> List[Job]:156        """List available jobs in descending order of their creation date"""157        # Reload state158        self._load_state()159        # Refresh job statuses160        for job in self.state.values():161            if job.status not in (STATUS_SUCCEEDED, STATUS_ERROR):162                self.state[job.job_id] = self._refresh_job(job)163        # Persist state164        self._persist_state()165        # Parse jobs166        jobs = list(self.state.values())167        jobs = sorted(jobs, key=lambda job: job.creation_date, reverse=True)168        return jobs169    def _refresh_children(self, children: Dict[int, BatchJob]) -> Dict[int, BatchJob]:170        """Refresh a set of batch jobs by querying the AWS API"""171        job_ids = [job.batch_id for job in children.values()]172        id_maps = {val.batch_id: key for key, val in children.items()}173        # Iterate over chunks of 100 jobs maximum (limited by AWS)174        cur_chunk, chunk_size = 0, 100175        while cur_chunk < (len(children) // chunk_size) + 1:176            chunk_start, chunk_end = (177                cur_chunk * chunk_size,178                (cur_chunk + 1) * chunk_size,179            )180            # Request batch of jobs181            response = self.batch_client.describe_jobs(jobs=job_ids[chunk_start:chunk_end])182            # Parse results183            for obj in response["jobs"]:184                children[id_maps[obj["jobId"]]].status = obj["status"]185                # Check if startedAt is set186                if "startedAt" in obj:187                    unix_ts = int(str(obj["startedAt"])[0:-3])188                    children[id_maps[obj["jobId"]]].started_at = datetime.fromtimestamp(unix_ts)189                if "stoppedAt" in obj:190                    unix_ts = int(str(obj["stoppedAt"])[0:-3])191                    children[id_maps[obj["jobId"]]].stopped_at = datetime.fromtimestamp(unix_ts)192            cur_chunk += 1193        return children194    def _refresh_job(self, job: Job) -> Job:195        """Refresh a job by querying its children and checking it's status"""196        # Refresh children197        job.children = self._refresh_children(job.children)198        # Determine parent job's status199        completion_date = None200        running = False201        ran_into_error = False202        for batch_job in job.children.values():203            # Check if error encountered204            if batch_job.status == "FAILED":205                ran_into_error = True206            # Check if job is still running207            if batch_job.status in (208                "SUBMITTED",209                "PENDING",210                "RUNNABLE",211                "STARTING",212                "RUNNING",213            ):214                running = True215            # Set completion time216            if batch_job.stopped_at is not None:217                if completion_date is None or completion_date < batch_job.stopped_at:218                    completion_date = batch_job.stopped_at219        if not running and not ran_into_error:220            job.status = STATUS_SUCCEEDED221        elif not running and ran_into_error:222            job.status = STATUS_ERROR223        else:224            job.status = STATUS_RUNNING225        job.completion_date = completion_date226        return job227    def get_job(self, job_id: str) -> Optional[Job]:228        """Fetch a job from state by its job ID"""229        # Refresh state230        self._load_state()231        # Return None if not found232        if job_id not in self.state:233            return None234        job = self.state[job_id]235        # Update job236        job = self._refresh_job(job)237        # Persist state238        self.state[job_id] = job239        self._persist_state()240        return job241    def delete_job(self, job: Job) -> None:242        """Cancel a job and remove it from the state"""243        # Refresh state244        self._load_state()245        # Cancel children246        typer.echo("Cancelling batch jobs...")247        for batch_job in job.children.values():248            self.batch_client.cancel_job(jobId=batch_job.batch_id, reason="Canceled by user.")249        # Remove S3 directory250        typer.echo("Removing artifacts...")251        self.bucket.objects.filter(Prefix=f"jobs/{job.job_id}").delete()252        # Remove job from state253        typer.echo("Delete job from state...")254        del self.state[job.job_id]255        # Persist state256        self._persist_state()257    def sync_files(self, job: Job, output_path: str) -> None:258        """Sync a job's files to local disk."""259        # Fetch all objects belonging to a specific job260        job_path = f"jobs/{job.job_id}"261        job_objects = self.bucket.objects.filter(Prefix=job_path)262        # Download each non-blend object263        for obj in job_objects:264            if obj.key.split(".")[-1] != "blend":265                # Determine output path266                obj_subpath = obj.key.split(job_path + "/")[1]267                output_file = Path(output_path) / Path(obj_subpath)268                print(output_file)269                # Create directory if it does not exist270                if not os.path.exists(str(output_file.parent)):...state.py
Source:state.py  
...12                self._state = self._load_state()13        return self._state14    def save(self):15        with self.lock:16            self._persist_state()17            18    def _load_state(self):19        if not os.path.exists(self.filename):20            return {}21        with open(self.filename, 'rb') as f:22            return pickle.load(f)23        24    def _persist_state(self):25        with open(self.filename, 'wb') as f:26            pickle.dump(self._state, f)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
