Best Python code snippet using tempest_python
RC.py
Source:RC.py  
...10class Resource:11    def __init__(self):12        global  open_state13        open_state = False14        self.show_resource()1516    def add_resource(self,dict):17        self.show_resource()18        # self.json_file  is updated19        self.resources_list.append(dict)20        # print(self.resources_list)21        # threadlock.acquire()22        global open_state23        if not open_state:24            open_state = True25            with open('logfile.json', 'w') as logfile:26                json.dump(self.json_file, logfile)27            logfile.close()28            open_state = False29        # threadlock.release()3031    def show_resource(self):32        global open_state33        try:34            if not open_state:35                open_state = True36                with open('logfile.json', 'r') as logfile:37                    self.json_file = json.load(logfile)38                logfile.close()39                open_state = False40            self.resources_list = self.json_file["list_of_RCs"]41        except:42            reset_str = {"outer_part": "hello", "list_of_RCs": [],"delete_duration":25}43            # print(type(reset_str))44            print('error in config file now resetting it...')45            # threadlock.acquire()46            open_state = True47            with open('logfile.json', 'w') as logfile:48                self.json_file = json.dump(reset_str, logfile)49            logfile.close()50            open_state = False51            # threadlock.release()525354    def update_resource(self,dict):55        self.show_resource()56        global open_state57        # get the list of resources from the json-formatted log file58        for R in self.resources_list:59            if dict['resource_name']==R['resource_name']:60                # print('\n\n found it\n \n')61                RC_list_index =self.resources_list.index(R)62                R = dict63                R['Updated'] = ((time.time()))64                self.resources_list[RC_list_index]=dict65        # threadlock.acquire()66        if not open_state:67            open_state = True68            with open('logfile.json', 'w') as logfile:69                json.dump(self.json_file, logfile)70            logfile.close()71            open_state = False72        # threadlock.release()73    def delete_resource(self,name):74        # get data from DEL request body75        global open_state76        self.show_resource()77        for R in self.resources_list:78            # removing only the first name that match79            if R['resource_name'] == name:80                # print(resources_list)81                self.resources_list.remove(R)82                # print(resources_list)83        # backing things up84        # threadlock.acquire()8586        if not open_state:87            open_state = True88            with open('logfile.json', 'w') as logfile:89                json.dump(self.json_file, logfile)90            logfile.close()91            open_state = False92            # threadlock.release()939495class Resource_cat:96    exposed = True9798    def __init__(self):99        self.CAT = Resource()100101    def GET(self):102        # self.CAT.json_file  will be updated103        self.CAT.show_resource()104        # print(json_file)105        return json.dumps(self.CAT.json_file)106    # maybe implement a filter using URI (future work)107108    def POST(self):109        # get the json to be added from POST request body110        to_add= cherrypy.request.body.read()111        data=to_add.decode('utf-8')112        dict = json.loads(data)113        #updating time114        dict['Updated'] = (time.time())115        #adding the new resource to the log file116        self.CAT.add_resource(dict)117        # returning the new updated resource catalog json file118        self.CAT.json_file = self.CAT.show_resource()119        return json.dumps(self.CAT.json_file)120121122    def PUT(self):123        #get data from PUT request body124        to_edit = cherrypy.request.body.read()125        data = to_edit.decode('utf-8')126        dict = json.loads(data)127128        self.CAT.update_resource(dict)129        self.CATjson_file = self.CAT.show_resource()130        return json.dumps(self.CAT.json_file)131132    def DELETE(self,*uri):133        #get data from DEL request body134        self.CAT.show_resource()135        name_to_be_removed = uri[0]136        self.CAT.delete_resource(name_to_be_removed)137        self.CAT.show_resource()138        return json.dumps(self.CAT.json_file)139140141class  sc_registration_thread(threading.Thread):142143    def __init__(self, thread_ID):144        threading.Thread.__init__(self)145        self.thread_ID = thread_ID146        with open('initialization.json') as file:147            self.dicto = json.load(file)148        # print(dict)149        file.close()150151    def run(self):
...RC_local.py
Source:RC_local.py  
...10class Resource:11    def __init__(self):12        global  open_state13        open_state = False14        self.show_resource()1516    def add_resource(self,dict):17        self.show_resource()18        # self.json_file  is updated19        self.resources_list.append(dict)20        # print(self.resources_list)21        # threadlock.acquire()22        global open_state23        if not open_state:24            open_state = True25            with open('logfile.json', 'w') as logfile:26                json.dump(self.json_file, logfile)27            logfile.close()28            open_state = False29        # threadlock.release()3031    def show_resource(self):32        global open_state33        try:34            if not open_state:35                open_state = True36                with open('logfile.json', 'r') as logfile:37                    self.json_file = json.load(logfile)38                logfile.close()39                open_state = False40            self.resources_list = self.json_file["list_of_RCs"]41        except:42            reset_str = {"outer_part": "hello", "list_of_RCs": [],"delete_duration":25}43            # print(type(reset_str))44            print('error in config file now resetting it...')45            # threadlock.acquire()46            open_state = True47            with open('logfile.json', 'w') as logfile:48                self.json_file = json.dump(reset_str, logfile)49            logfile.close()50            open_state = False51            # threadlock.release()525354    def update_resource(self,dict):55        self.show_resource()56        global open_state57        # get the list of resources from the json-formatted log file58        for R in self.resources_list:59            if dict['resource_name']==R['resource_name']:60                # print('\n\n found it\n \n')61                RC_list_index =self.resources_list.index(R)62                R = dict63                R['Updated'] = ((time.time()))64                self.resources_list[RC_list_index]=dict65        # threadlock.acquire()66        if not open_state:67            open_state = True68            with open('logfile.json', 'w') as logfile:69                json.dump(self.json_file, logfile)70            logfile.close()71            open_state = False72        # threadlock.release()73    def delete_resource(self,name):74        # get data from DEL request body75        global open_state76        self.show_resource()77        for R in self.resources_list:78            # removing only the first name that match79            if R['resource_name'] == name:80                # print(resources_list)81                self.resources_list.remove(R)82                # print(resources_list)83        # backing things up84        # threadlock.acquire()8586        if not open_state:87            open_state = True88            with open('logfile.json', 'w') as logfile:89                json.dump(self.json_file, logfile)90            logfile.close()91            open_state = False92            # threadlock.release()939495class Resource_cat:96    exposed = True9798    def __init__(self):99        self.CAT = Resource()100101    def GET(self):102        # self.CAT.json_file  will be updated103        self.CAT.show_resource()104        # print(json_file)105        return json.dumps(self.CAT.json_file)106    # maybe implement a filter using URI (future work)107108    def POST(self):109        # get the json to be added from POST request body110        to_add= cherrypy.request.body.read()111        data=to_add.decode('utf-8')112        dict = json.loads(data)113        #updating time114        dict['Updated'] = (time.time())115        #adding the new resource to the log file116        self.CAT.add_resource(dict)117        # returning the new updated resource catalog json file118        self.CAT.json_file = self.CAT.show_resource()119        return json.dumps(self.CAT.json_file)120121122    def PUT(self):123        #get data from PUT request body124        to_edit = cherrypy.request.body.read()125        data = to_edit.decode('utf-8')126        dict = json.loads(data)127128        self.CAT.update_resource(dict)129        self.CATjson_file = self.CAT.show_resource()130        return json.dumps(self.CAT.json_file)131132    def DELETE(self,*uri):133        #get data from DEL request body134        self.CAT.show_resource()135        name_to_be_removed = uri[0]136        self.CAT.delete_resource(name_to_be_removed)137        self.CAT.show_resource()138        return json.dumps(self.CAT.json_file)139140141class  sc_registration_thread(threading.Thread):142143    def __init__(self, thread_ID):144        threading.Thread.__init__(self)145        self.thread_ID = thread_ID146    def run(self):147        # registering and updating of the registration148        # url = 'http://linksmart:8082/'  # when using a docker container149        url = 'http://localhost:8082/'150        reg.registration('Resource_CATALOG.json', url,'RC')151
...yyets.py
Source:yyets.py  
...5# YYeTs API6# Python 2 & 37__author__ = 'Benny <benny@bennythink.com>'8import requests9def show_resource(resource_id):10    """11    show resource information.12    :param resource_id: unique id for each resource, get from `search_resource`13    :return: download link in json14    """15    url = 'http://pc.zmzapi.com/index.php?g=api/pv2&m=index&a=resource&accesskey=519f9cab85c8059d17544947k361a827&id='16    r = requests.get(url + resource_id)17    return r.json()18def search_resource(name):19    """20    search resource21    :param name: tv/movie name22    :return: the whole search result23    """24    url = 'http://pc.zmzapi.com/index.php?g=api/pv2&m=index&a=search&accesskey=519f9cab85c8059d17544947k361a827&limit=200&k='25    r = requests.get(url + name)26    return r.json()27def query_resource(user_raw_message):28    """29    query resource name30    :param user_raw_message: `/query batman`31    :return: message to be sent/32    """33    name = user_raw_message[6:].lstrip(' ').rstrip(' ')34    search_result = search_resource(name)35    msg = ''36    s = search_result.get('data')37    if s is None:38        return '没æå¯¹åºçèµæºï¼ä½¿ç¨æ¹æ³/query éé¿å¯è»å´æç¨'39    elif len(s) > 1:40        for key in range(len(s)):41            msg = msg + ('%d. %s %s' % (key + 1, s[key].get('channel'), s[key].get('title'))) + '\n'42    return msg43def get_season_count(tv_name):44    """45    if it was movie, get download link whenever possible; if it was tv, get seasons count46    :param tv_name: name, such as `avatar`, `Black Mirror`47    :return: download link, `0, err_msg` or `season count, dl link`48    """49    search_result = search_resource(tv_name)50    # return the first result!51    if search_result.get('data') is None or search_result.get('data') == u'':52        return 0, '没æè¿ä¸ªèµæº'53    elif len(search_result.get('data')) > 1:54        search_result.update(data=[search_result.get('data')[0]])55    if search_result.get('data')[0].get('channel') == 'movie':56        # TODO: AccessKey  4002 èµæºå
³é 4003ææ¶æ²¡æèµæº57        download_link = show_resource(search_result.get('data')[0].get('id'))58        return (0, 'ç±äºaccesskeyçåå ï¼çµå½±æç´¢åºæ¬æ¯åºçï¼æ£å¨å°è¯ä¿®å¤ä¸â¦â¦') if download_link.get(59            'status') != 1 else (255, get_movie_link(download_link))60    else:61        download_link = show_resource(search_result.get('data')[0].get('id'))62        if download_link.get('status') != 1:63            return 0, 'èµæºä¸å¯ç¨'64        season = download_link.get('data').get('list')[0].get('season')65        return (1, download_link) if season == '101' else (int(season), download_link)66def get_movie_link(data):67    """68    get movie link69    :param data: link generate by `show_resource`70    :return: pass two params to iter_link to get actual link71    """72    dl_list = data.get('data').get('list')[0].get('episodes')[0]73    dl_hr_mp4 = dl_list.get('files').get('HR-HDTV') or dl_list.get('files').get('MP4')74    dl_app = dl_list.get('files').get('APP')75    return iter_link(dl_app, dl_hr_mp4)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
