How to use _persist_state method in localstack

Best Python code snippet using localstack_python

jobs.py

Source:jobs.py Github

copy

Full Screen

...70 except botocore.exceptions.ClientError as error:71 if error.response["Error"]["Code"] != "NoSuchKey":72 raise error73 self.state = {key: Job.parse_obj(val) for key, val in read_state.items()}74 def _persist_state(self) -> None:75 """Persist the jobs state to S3"""76 state_dict = {key: val.dict() for key, val in self.state.items()}77 serialized = bytes(json.dumps(state_dict, cls=DateTimeEncoder), "utf-8")78 self.s3_client.put_object(Bucket=BUCKET_NAME, Key=JOBS_STATE_FILE, Body=serialized)79 def _generate_id(self) -> str:80 """Generate a unique Job ID"""81 i = 082 while i < MAX_TRIES:83 job_id = "".join(random.choices(string.ascii_lowercase, k=4))84 if job_id not in self.state:85 return job_id86 i += 187 raise Exception("Unable to generate ID.")88 def _create_batch_jobs(self, job_id: str, start_frame: int, end_frame: int, gpu: bool) -> Dict[int, BatchJob]:89 """Create batch jobs for a render job"""90 batch_jobs = {}91 # Pick job definition and queue92 job_def, job_queue = JOB_DEF_CPU, JOB_QUEUE_CPU93 if gpu:94 job_def, job_queue = JOB_DEF_GPU, JOB_QUEUE_GPU95 # Create a batch job for each frame96 for frame_num in range(start_frame, end_frame + 1):97 name = f"render-job-{job_id}-frame-{frame_num}"98 params = dict(frame=str(frame_num), job=job_id)99 retry_strategy = dict(100 attempts=3,101 evaluateOnExit=[102 dict(onStatusReason="Host EC2*", action="RETRY"),103 dict(onReason="*", action="EXIT"),104 ],105 )106 # Make request107 response = self.batch_client.submit_job(108 jobName=name,109 jobQueue=job_queue,110 jobDefinition=job_def,111 parameters=params,112 retryStrategy=retry_strategy,113 )114 # Create pydantic model115 batch_job = BatchJob(batch_id=response["jobId"], name=response["jobName"], frame=frame_num)116 batch_jobs[frame_num] = batch_job117 typer.echo(f"Created batch job {batch_job.name}")118 return batch_jobs119 def create_job(self, blend_path: str, start_frame: int, end_frame: int, gpu: bool = False) -> Job:120 """121 Create a new job. That process composes of several steps:122 1. Generate a unique ID123 2. Upload blend file to S3124 3. Create AWS Batch jobs125 :param blend_path: Path to the blend file to upload.126 """127 # Reload state128 self._load_state()129 # Generate ID130 typer.echo("Generating unique ID...")131 job_id = self._generate_id()132 # Upload blend file to S3133 typer.echo("Uploading blend file to S3...")134 obj_key = f"jobs/{job_id}/main.blend"135 self.s3_client.upload_file(Filename=blend_path, Bucket=BUCKET_NAME, Key=obj_key)136 # Create AWS Batch jobs137 typer.echo("Creating batch jobs...")138 batch_jobs = self._create_batch_jobs(job_id, start_frame, end_frame, gpu)139 # Create pydantic model140 job = Job(141 job_id=job_id,142 creation_date=datetime.now(),143 children=batch_jobs,144 start_frame=start_frame,145 end_frame=end_frame,146 gpu=gpu,147 file_name=Path(blend_path).name,148 status=STATUS_RUNNING,149 )150 self.state[job.job_id] = job151 # Persist state152 self._persist_state()153 typer.echo(f"Created render job with id: {job.job_id}")154 return job155 def list_jobs(self) -> List[Job]:156 """List available jobs in descending order of their creation date"""157 # Reload state158 self._load_state()159 # Refresh job statuses160 for job in self.state.values():161 if job.status not in (STATUS_SUCCEEDED, STATUS_ERROR):162 self.state[job.job_id] = self._refresh_job(job)163 # Persist state164 self._persist_state()165 # Parse jobs166 jobs = list(self.state.values())167 jobs = sorted(jobs, key=lambda job: job.creation_date, reverse=True)168 return jobs169 def _refresh_children(self, children: Dict[int, BatchJob]) -> Dict[int, BatchJob]:170 """Refresh a set of batch jobs by querying the AWS API"""171 job_ids = [job.batch_id for job in children.values()]172 id_maps = {val.batch_id: key for key, val in children.items()}173 # Iterate over chunks of 100 jobs maximum (limited by AWS)174 cur_chunk, chunk_size = 0, 100175 while cur_chunk < (len(children) // chunk_size) + 1:176 chunk_start, chunk_end = (177 cur_chunk * chunk_size,178 (cur_chunk + 1) * chunk_size,179 )180 # Request batch of jobs181 response = self.batch_client.describe_jobs(jobs=job_ids[chunk_start:chunk_end])182 # Parse results183 for obj in response["jobs"]:184 children[id_maps[obj["jobId"]]].status = obj["status"]185 # Check if startedAt is set186 if "startedAt" in obj:187 unix_ts = int(str(obj["startedAt"])[0:-3])188 children[id_maps[obj["jobId"]]].started_at = datetime.fromtimestamp(unix_ts)189 if "stoppedAt" in obj:190 unix_ts = int(str(obj["stoppedAt"])[0:-3])191 children[id_maps[obj["jobId"]]].stopped_at = datetime.fromtimestamp(unix_ts)192 cur_chunk += 1193 return children194 def _refresh_job(self, job: Job) -> Job:195 """Refresh a job by querying its children and checking it's status"""196 # Refresh children197 job.children = self._refresh_children(job.children)198 # Determine parent job's status199 completion_date = None200 running = False201 ran_into_error = False202 for batch_job in job.children.values():203 # Check if error encountered204 if batch_job.status == "FAILED":205 ran_into_error = True206 # Check if job is still running207 if batch_job.status in (208 "SUBMITTED",209 "PENDING",210 "RUNNABLE",211 "STARTING",212 "RUNNING",213 ):214 running = True215 # Set completion time216 if batch_job.stopped_at is not None:217 if completion_date is None or completion_date < batch_job.stopped_at:218 completion_date = batch_job.stopped_at219 if not running and not ran_into_error:220 job.status = STATUS_SUCCEEDED221 elif not running and ran_into_error:222 job.status = STATUS_ERROR223 else:224 job.status = STATUS_RUNNING225 job.completion_date = completion_date226 return job227 def get_job(self, job_id: str) -> Optional[Job]:228 """Fetch a job from state by its job ID"""229 # Refresh state230 self._load_state()231 # Return None if not found232 if job_id not in self.state:233 return None234 job = self.state[job_id]235 # Update job236 job = self._refresh_job(job)237 # Persist state238 self.state[job_id] = job239 self._persist_state()240 return job241 def delete_job(self, job: Job) -> None:242 """Cancel a job and remove it from the state"""243 # Refresh state244 self._load_state()245 # Cancel children246 typer.echo("Cancelling batch jobs...")247 for batch_job in job.children.values():248 self.batch_client.cancel_job(jobId=batch_job.batch_id, reason="Canceled by user.")249 # Remove S3 directory250 typer.echo("Removing artifacts...")251 self.bucket.objects.filter(Prefix=f"jobs/{job.job_id}").delete()252 # Remove job from state253 typer.echo("Delete job from state...")254 del self.state[job.job_id]255 # Persist state256 self._persist_state()257 def sync_files(self, job: Job, output_path: str) -> None:258 """Sync a job's files to local disk."""259 # Fetch all objects belonging to a specific job260 job_path = f"jobs/{job.job_id}"261 job_objects = self.bucket.objects.filter(Prefix=job_path)262 # Download each non-blend object263 for obj in job_objects:264 if obj.key.split(".")[-1] != "blend":265 # Determine output path266 obj_subpath = obj.key.split(job_path + "/")[1]267 output_file = Path(output_path) / Path(obj_subpath)268 print(output_file)269 # Create directory if it does not exist270 if not os.path.exists(str(output_file.parent)):...

Full Screen

Full Screen

state.py

Source:state.py Github

copy

Full Screen

...12 self._state = self._load_state()13 return self._state14 def save(self):15 with self.lock:16 self._persist_state()17 18 def _load_state(self):19 if not os.path.exists(self.filename):20 return {}21 with open(self.filename, 'rb') as f:22 return pickle.load(f)23 24 def _persist_state(self):25 with open(self.filename, 'wb') as f:26 pickle.dump(self._state, f)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful