How to use shelve_open method in autotest

Best Python code snippet using autotest_python Github


Full Screen

...20 if not isdir( return21 if not exists(self.shelvepath): return22 if not isfile(self.shelvepath): return23 with LockFile(self.shelvepath, timeout=10) as lock:24 shelve = shelve_open(self.shelvepath)25 try:26 if set(shelve.keys()) != set(['individuals', 'functionals', 'removed', 'added', 'new']):27 raise RuntimeError('{0} is not a GA SHELVECAR file.'.format(self.shelvepath))28 finally: shelve.close()29 @property30 def success(self): return False31 @property32 def directory(self):33 """ Directory where output should be found. """34 return self._directory.path35 @property36 def shelvepath(self):37 """ Directory where output should be found. """38 from os.path import join39 from .functional import Functional40 return join(, Functional.SHELVECAR)41 @property42 def functionals(self):43 """ Functionals at each generation. """44 from shelve import open as shelve_open45 from ..misc import LockFile46 with LockFile(self.shelvepath, timeout=10) as lock:47 try:48 shelve = shelve_open(self.shelvepath)49 return shelve['functionals']50 finally: shelve.close()51 @property 52 def populations(self):53 """ Populations at each generation. """54 return [func.population for func in self.functionals]55 @property56 def functional(self):57 """ Return current functional. """58 return self.functionals[-1]59 @property60 def generation(self):61 """ Returns current generation. """62 return self.functional.generation63 @property64 def population(self):65 """ Returns current population. """66 return self.functional.population67 @property68 def offspring(self):69 """ Returns current population. """70 return self.functional.offspring71 72 @property 73 def added(self):74 """ Individuals added at each generation. """75 from shelve import open as shelve_open76 from ..misc import LockFile77 with LockFile(self.shelvepath, timeout=10) as lock:78 try:79 shelve = shelve_open(self.shelvepath)80 result = []81 for add in shelve['added']:82 result.extend(shelve['individuals'][i] for i in add)83 return result84 finally: shelve.close()85 @property 86 def removed(self):87 """ Individuals removed at each generation. """88 from shelve import open as shelve_open89 from ..misc import LockFile90 with LockFile(self.shelvepath, timeout=10) as lock:91 try:92 shelve = shelve_open(self.shelvepath)93 result = []94 for r in shelve['removed']:95 result.extend(shelve['individuals'][i] for i in r)96 return result97 finally: shelve.close()98 def best(self, n=10): 99 """ Current n best individuals. 100 :param int n: 101 Number of individuals to print.102 """103 from operator import itemgetter104 from ..jobfolder.forwarding_dict import ForwardingDict105 individuals = self.individuals106 fitness = sorted( [(name, for name, u in individuals.iteritems()],\107 key=itemgetter(1) )[:n]108 result = ForwardingDict(readonly=True, ordered=False)109 for key, value in fitness: result[key] = individuals[key]110 return result111 112 @property113 def individuals(self):114 """ Dictionary containing all individuals. """115 from shelve import open as shelve_open116 from ..misc import LockFile117 with LockFile(self.shelvepath, timeout=10) as lock:118 try: shelve = shelve_open(self.shelvepath)119 except: raise120 else: return shelve['individuals']...

Full Screen

Full Screen Github


Full Screen

...8# global config9BRANCHES = ["refs/heads/master", "refs/heads/km"]10DEPLOY_KEY = "[ci deploy]"11@contextlib.contextmanager12def shelve_open(name, **kwargs):13 try:14 db =, **kwargs)15 yield db16 finally:17 db.close()18def incr(key, record):19 with shelve_open("shelve.db") as db:20 db[key] += 121 db["last_time"] = str( db_records = db["records"]23 db_records.append(record)24 db["records"] = db_records25def fetch_stats():26 with shelve_open("shelve.db") as db:27 return dict(db)28class MainHandler(tornado.web.RequestHandler):29 def get(self):30 self.write(fetch_stats())31 def post(self):32 print(self.request.body)33 data = json.loads(self.request.body)34 action = data.get("object_kind")35 if action == "push":36 target_branch = data.get("ref")37 if target_branch not in BRANCHES:38 self.write("branch skipped")39 return40 last_commit = data.get("commits")[0]41 last_commit_message = last_commit["message"]42 if DEPLOY_KEY not in last_commit_message:43 self.write("push skipped")44 return45 deploy_cmd = "python xxx"46 subprocess.Popen(deploy_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)47 incr(48 "total", [data["user_name"], action, target_branch, last_commit_message,49 last_commit["timestamp"], ],50 )51 elif action == "merge_request":52 object_attributes = data["object_attributes"]53 target_branch, title = object_attributes["target_branch"], object_attributes["title"]54 if target_branch not in BRANCHES:55 self.write("target_branch skipped")56 return57 if object_attributes["state"] != "merged":58 self.write("not merged skipped")59 return60 if DEPLOY_KEY not in object_attributes["title"]:61 self.write("title skipped")62 return63 deploy_cmd = "python xxx"64 subprocess.Popen(deploy_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)65 incr(66 "total",67 [object_attributes["user"]["name"], action, target_branch, title,68 object_attributes["updated_at"], ],69 )70 self.write("success")71def make_app():72 return tornado.web.Application([(r"/", MainHandler), ])73if __name__ == "__main__":74 app = make_app()75 app.listen(8888)76 # store task stats in shelve.db77 with shelve_open("shelve.db", writeback=True) as db:78 if not db.keys():79 db.update(last_time=None, total=0, succeed=0, failed=0, records=[])80 print("please visit http://localhost:8888")...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:


You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?