Best Python code snippet using localstack_python
deltamc.py
Source:deltamc.py  
...100if (gui):101	tkinst = Gui(root)102transfer_tkinst()103cprint("You are running " + sys.platform)104update_archive(True)105if(gui):106	delta_gui.updateinst()107	tkinst.update_modlist()108check_for_updates()109cprint("DeltaMC v"+version)110instance = delta_gui.instance111if (args.instance != "None"):112	instance = args.instance113if(instance == "@ERROR@"):114	instance = delta_gui.instance115cprint("Selected Instance: "+instance)116cprint("Minecraft Version: "+mc_version)117upgradesavailable = delta_upgrade.get_upgrades(instance)118if (upgradesavailable == []):119	pass120else:121	cprint("The following upgrades are available for instance "+instance+":")122	for upgrade in upgradesavailable:123		cprint(" "+upgrade[0].name+" (current version: "+upgrade[1].versions[0]["Version"]+", you have: "+upgrade[0].versions[0]["Version"]+")")124if (args.install != "None"):125	delta_install.install_mod(args.install)126	sys.exit()127if (args.remove != "None"):128	delta_remove.remove_mod(args.remove)129	sys.exit()130if (args.upgrade != "None"):131	delta_upgrade.upgrade_mod(args.upgrade)132	sys.exit()133if (args.info != "None"):134	get_info(args.info, output=True)135	sys.exit()136if (args.export != "None"):137	delta_importexport.export_mods(args.export)138	sys.exit()139if (args.importa != "None"):140	delta_importexport.import_mods(args.importa)141	sys.exit()142if (gui == False):143	print_help()144def parsecmd(command):145		if(command.split(" ")[0] == "update"):146			update_archive()147		elif(command.split(" ")[0] == "upgrades"):148			if(len(command.split(" ")) == 2 and command.split(" ")[1] != ""):149				inst = command.split(" ")[1]150				if(inst == "*"):151					inst = None152				if (not instance_exists(inst) and inst != None):153					cprint("Instance "+inst+" does not exist.")154					return155				update_archive()156				delta_upgrade.check_upgrades(True, inst)157			elif(len(command.split(" ")) == 1):158				inst = input("Enter instance name: ")159				if(inst == "*"):160					inst = None161				if (not instance_exists(inst) and inst != None):162					cprint("Instance "+inst+" does not exist.")163					return164				update_archive()165				delta_upgrade.check_upgrades(True, inst)166			else:167				cprint("Invalid command syntax.")168		elif(command.split(" ")[0] == "upgrade"):169			if(len(command.split(" ")) == 2 and command.split(" ")[1] != ""):170				mod = command.split(" ")[1]171				update_archive()172				upgrades = delta_upgrade.get_upgrades()173				delta_upgrade.upgrade_mod(mod)174			elif(len(command.split(" ")) == 1):175				mod = None176				update_archive()177				upgrades = delta_upgrade.get_upgrades()178				delta_upgrade.upgrade_mod(mod)179			else:180				cprint("Invalid command syntax.")181		elif(command.split(" ")[0] == "upgradeall"):182			if(len(command.split(" ")) == 2 and command.split(" ")[1] != ""):183				inst = command.split(" ")[1]184				if(inst == "*"):185					inst = None186				if (not instance_exists(inst) and inst != None):187					cprint("Instance "+inst+" does not exist.")188					return189			elif(len(command.split(" ")) == 1):190				inst = input("Enter instance name: ")191				if(inst == "*"):192					inst = None193				if (not instance_exists(inst) and inst != None):194					cprint("Instance "+inst+" does not exist.")195					return196			else:197				cprint("Invalid command syntax.")198			update_archive()199			updates = delta_upgrade.get_upgrades(inst)200			if(len(updates) == 0):201				cprint("No upgrades available.")202			else:203				for update in updates:204					delta_upgrade.upgrade_mod(update[0]["Name"])205		elif(command.split(" ")[0] == "install"):206			if(len(command.split(" ")) == 3 and command.split(" ")[2] != ""):207				mod = command.split(" ")[1]208				mod_version = command.split(" ")[2]209				update_archive()210				delta_install.install_mod(mod, version)211			elif(len(command.split(" ")) == 2 and command.split(" ")[1] != ""):212				mod = command.split(" ")[1]213				update_archive()214				delta_install.install_mod(mod)215			elif(len(command.split(" ")) == 1):216				mod = None217				update_archive()218				delta_install.install_mod(mod)219			else:220				cprint("Invalid command syntax.")221				cprint(len(command.split(" ")))222		elif(command.split(" ")[0] == "remove"):223			if(len(command.split(" ")) == 2 and command.split(" ")[1] != ""):224				mod = command.split(" ")[1]225				delta_remove.remove_mod(mod)226			elif(len(command.split(" ")) == 1):227				mod = None228				delta_remove.remove_mod(mod)229			else:230				cprint("Invalid command syntax.")231		elif(command.split(" ")[0] == "info"):232			if(len(command.split(" ")) == 2 and command.split(" ")[1] != ""):233				mod = command.split(" ")[1]234				get_info(mod, output=True)235			elif(len(command.split(" ")) == 1):236				mod = None237				get_info(mod, output=True)238		elif(command.split(" ")[0] == "installm" or command.split(" ")[0] == "installmany"):239			if(len(command.split(" ")) >= 2):240				modslist = command.split(" ")[1:] # separate mod names with spaces241				update_archive()242				string = "Attempting to install: "243				for item in modslist:244					string = string + item+", "245				cprint(string[:-2]+"...") # [:-2] to cut off the extra ", " after the last element246				for item in modslist:247					delta_install.install_mod(item)248			else:249				cprint("Invalid command syntax.")250		elif(command.split(" ")[0] == "removem" or command.split(" ")[0] == "removemany"):251			if(len(command.split(" ")) >= 2):252				modslist = command.split(" ")[1:] # separate mod names with spaces253				update_archive()254				string = "Attempting to remove: "255				for item in modslist:256					string = string + item+", "257				cprint(string[:-2]+"...") # [:-2] to cut off the extra ", " after the last element258				for item in modslist:259					delta_remove.remove_mod(item)260			else:261				cprint("Invalid command syntax.")262		elif(command.split(" ")[0] == "upgradem" or command.split(" ")[0] == "upgrademany"):263			if(len(command.split(" ")) >= 2):264				modslist = command.split(" ")[1:] # separate mod names with spaces265				update_archive()266				string = "Attempting to upgrade: "267				for item in modslist:268					string = string + item+", "269				cprint(string[:-2]+"...") # [:-2] to cut off the extra ", " after the last element270				for item in modslist:271					delta_upgrade.upgrade_mod(item)272			else:273				cprint("Invalid command syntax.")274		elif(command.split(" ")[0] == "export"):275			if(len(command.split(" ")) == 2 and command.split(" ")[1] != ""):276				name = command.split(" ")[1]277				update_archive()278				delta_importexport.export_mods(name)279			elif(len(command.split(" ")) == 1):280				name = None281				update_archive()282				delta_importexport.export_mods(name)283			else:284				cprint("Invalid command syntax.")285		elif(command.split(" ")[0] == "import"):286			if(len(command.split(" ")) == 2 and command.split(" ")[1] != ""):287				path = command.split(" ")[1]288				update_archive()289				delta_importexport.import_mods(path)290			elif(len(command.split(" ")) == 1):291				path = None292				update_archive()293				delta_importexport.import_mods(path)294			else:295				cprint("Invalid command syntax.")296		elif(command.split(" ")[0] == "instance" or command.split(" ")[0] == "inst"):297			if(len(command.split(" ")) == 2 and command.split(" ")[1] != ""):298				name = command.split(" ")[1]299				if(instance_exists(name)):300					if(name == instance):301						cprint("Instance "+name+" already selected!")302					else:303						setup_config(name)304						cprint("Switched to instance "+name+".")305				else:306					cprint("Instance "+name+" does not exist.")...core.py
Source:core.py  
1""" Base class implementation for core features present in the updater module.2This is the updater core class which provides all facilities other implementation should rely in.3This class should not be used directly, use a derived class instead.4"""5import contextlib6import io7import os8import platform9import zipfile10import logging11import json12import urllib.request13from pubsub import pub # type: ignore14from typing import Optional, Dict, Tuple, Union, Any15from . import paths16log = logging.getLogger("updater.core")17class UpdaterCore(object):18    """ Base class for all updater implementations.19    Implementations must add user interaction methods and call logic for all methods present in this class.20    """21    def __init__(self, endpoint: str, current_version: str, app_name: str = "", password: Optional[bytes] = None) -> None:22        """ 23        :param endpoint: The URl endpoint where the module should retrieve update information. It must return a json valid response or a non 200 HTTP status code.24        :type endpoint: str25        :param current_version: Application's current version.26        :type current_version: str27        :param app_name: Name of the application.28            (default is empty)29        :type app_name: str30        :param password: Password for update zipfile.31        :type password: bytes32        """33        self.endpoint = endpoint34        self.current_version = current_version35        self.app_name = app_name36        self.password = password37    def get_update_information(self) -> Dict[str, Any]:38        """ Calls the provided URL endpoint and returns information about the available update sent by the server. The format should adhere to the json specifications for updates.39        If the server returns a status code different to 200 or the json file is not valid, this will raise either a :py:exc:`urllib.error.HTTPError` or a :external:py:exc:`json.JSONDecodeError`.40        :rtype: dict41        """42        response = urllib.request.urlopen(self.endpoint)43        data: str = response.read()44        content: Dict[str, Any] = json.loads(data)45        return content46    def get_version_data(self, content: Dict[str, Any]) -> Tuple[Union[bool, str], Union[bool, str], Union[bool, str]]:47        """ Parses the dictionary returned by :py:func:`updater.core.updaterCore.get_update_information` and, if there is a new update available, returns information about it in a tuple.48        the module checks whether :py:attr:`updater.core.updaterCore.current_version` is different to the version reported in the update file, and the json specification file contains a binary link for the user's architecture. If both of these conditions are True, a tuple is returned with (new_version, update_description, update_url).49        If there is no update available,  a tuple with Falsy values is returned.50        This method can raise a KeyError if there are no updates for the current architecture defined in the update file.51        :returns: tuple with update information or False values.52        :rtype: tuple53        """54        available_version = content["current_version"]55        update_url_key = platform.system()+platform.architecture()[0][:2]56        if available_version == self.current_version:57            return (False, False, False)58        if content["downloads"].get(update_url_key) == None:59            log.error("Update file doesn't include architecture {}".format(update_url_key))60            raise KeyError("Update file doesn't include current architecture.")61        available_description = content["description"]62        update_url = content ['downloads'][update_url_key]63        return (available_version, available_description, update_url)64    def download_update(self, update_url: str, update_destination: str, chunk_size: int = io.DEFAULT_BUFFER_SIZE) -> str:65        """ Downloads an update URL and notifies all subscribers of the download progress.66        This function will send a pubsub notification every time the download progress updates by using :py:func:`pubsub.pub.sendMessage` under the topic "updater.update-progress".67        You might subscribe to this notification by using :py:func:`pubsub.pub.subscribe` with a function with this signature:68        ``def receive_progress(total_downloaded: int, total_size: int):``69        In this function, it is possible to update the UI progress bar if needed.70        Don't forget to call :py:func:`pubsub.pub.unsubscribe` at the end of the update.71        :param update_url: Direct link to update zip file.72        :type update_url: str73        :param update_destination: Destination path to save the update file74        :type update_destination: str75        :param chunk_size: chunk size for downloading the update (default to :py:data:`io.DEFAULT_BUFFER_SIZE`)76        :type chunk_size: int77        :returns: The update file path in the system.78        :rtype: str79        """80        def _download_callback(transferred_blocks, block_size, total_size):81            total_downloaded = transferred_blocks*block_size82            pub.sendMessage("updater.update-progress", total_downloaded=total_downloaded, total_size=total_size)83        filename, headers = urllib.request.urlretrieve(update_url, update_destination, _download_callback)84        log.debug("Update downloaded")85        return update_destination86    def extract_update(self, update_archive: str, destination: str) -> str:87        """ Given an update archive, extracts it. Returns the directory to which it has been extracted.88        :param update_archive: Path to the update file.89        :type update_archive: str90        :param destination: Path to extract the archive. User must have permission to do file operations on the path.91        :type destination: str92        :returns: Path where the archive has been extracted.93        :rtype: str94        """95        with contextlib.closing(zipfile.ZipFile(update_archive)) as archive:96            if self.password:97                archive.setpassword(self.password)98            archive.extractall(path=destination)99        log.debug("Update extracted")100        return destination101    def move_bootstrap(self, extracted_path: str) -> str:102        """ Moves the bootstrapper binary from the update extraction folder to a working path, so it will be able to perform operations under the update directory later.103        :param extracted_path: Path to which the update file has been extracted.104        :type extracted_path: str105        :returns: The path to the bootstrap binary to be run to actually perform the update.106        :rtype: str107        """108        working_path = os.path.abspath(os.path.join(extracted_path, '..'))109        if platform.system() == 'Darwin':110            extracted_path = os.path.join(extracted_path, 'Contents', 'Resources')111        downloaded_bootstrap = os.path.join(extracted_path, self.bootstrap_name())112        new_bootstrap_path = os.path.join(working_path, self.bootstrap_name())113        os.rename(downloaded_bootstrap, new_bootstrap_path)114        return new_bootstrap_path115    def execute_bootstrap(self, bootstrap_path: str, source_path: str) -> None:116        """ Executes the bootstrapper binary, which will move the files from the update directory to the app folder, finishing with the update process.117        :param bootstrap_path: Path to the bootstrap binary that will perform the update, as returned by :py:func:`move_bootstrap`118        :type bootstrap_path: str119        :param source_path: Path where the update file was extracted, as returned by :py:func:`extract_update`120        :type source_path: str121        """122        arguments = r'"%s" "%s" "%s" "%s"' % (os.getpid(), source_path, paths.app_path(), paths.get_executable())123        if platform.system() == 'Windows':124            import win32api # type: ignore125            win32api.ShellExecute(0, 'open', bootstrap_path, arguments, '', 5)126        else:127            import subprocess128            self.make_executable(bootstrap_path)129            subprocess.Popen(['%s %s' % (bootstrap_path, arguments)], shell=True)130        log.info("Bootstrap executed")131    def bootstrap_name(self) -> str:132        """ Returns the name of the bootstrapper, based in user platform.133        :rtype: str134        """135        if platform.system() == 'Windows':136            return 'bootstrap.exe'137        elif platform.system() == 'Darwin':138            return 'bootstrap-mac.sh'139        return 'bootstrap-lin.sh'140    def make_executable(self, path: str) -> None:141        """ Set execution permissions in a script on Unix platforms. """142        import stat143        st = os.stat(path)...sync.py
Source:sync.py  
1import bibtexparser2import json3import os4import pickle5import pprint6import requests7"""8TODO:9- Default icon for papers.10"""11ARCHIVE_PATH = 'archive.pk'12BIB_PATH = 'references.bib'13NOTION_TOKEN = os.environ['NOTION_TOKEN']14DATABASE_IDENTIFIER = os.environ['DATABASE_IDENTIFIER']15def notion_add_entry(16    title='',17    authors='',18    year='0',19    ref_id='',20    link='',21):22    url = "https://api.notion.com/v1/pages"23    payload = {24        "parent": {25            'database_id': DATABASE_IDENTIFIER,26        },27        "properties": {28            'Title': {29                'title': [{30                    'text': {31                        'content': title,32                        }33                    }]34                },35            'Authors': {36                "rich_text": [{37                    "type": "text",38                    "text": {39                        "content": authors,40                    }41                }],42            },43            'Year': {44                "rich_text": [{45                    "type": "text",46                    "text": {47                        "content": year,48                    }49                }],50            },51            'Reference ID': {52                "rich_text": [{53                    "type": "text",54                    "text": {55                        "content": ref_id,56                    }57                }],58            },59            'Link': {60                "url": link,61            },62                    63                    64        },65    }66    #  pprint.pprint(payload)67    headers = {68        "Accept": "application/json",69        "Notion-Version": "2022-06-28",70        "Content-Type": "application/json",71        "Authorization": f"Bearer {NOTION_TOKEN}"72    }73    response = requests.post(url, json=payload, headers=headers)74    #  pprint.pprint(json.loads(response.text))75def notion_update_page(76    page_id,77    title='',78    authors='',79    year='0',80    ref_id='',81    link='',82):83    url = f"https://api.notion.com/v1/pages/{page_id}"84    payload = {85        "parent": {86            'database_id': DATABASE_IDENTIFIER,87        },88        "properties": {89            'Title': {90                'title': [{91                    'text': {92                        'content': title,93                        }94                    }]95                },96            'Authors': {97                "rich_text": [{98                    "type": "text",99                    "text": {100                        "content": authors,101                    }102                }],103            },104            'Year': {105                "rich_text": [{106                    "type": "text",107                    "text": {108                        "content": year,109                    }110                }],111            },112            'Reference ID': {113                "rich_text": [{114                    "type": "text",115                    "text": {116                        "content": ref_id,117                    }118                }],119            },120            'Link': {121                "url": link,122            },123                    124                    125        },126    }127    #  pprint.pprint(payload)128    headers = {129        "Accept": "application/json",130        "Notion-Version": "2022-06-28",131        "Content-Type": "application/json",132        "Authorization": f"Bearer {NOTION_TOKEN}"133    }134    response = requests.patch(url, json=payload, headers=headers)135    print(response)136def notion_fetch_page(ref_id):137    url = f"https://api.notion.com/v1/databases/{DATABASE_IDENTIFIER}/query"138    # list database pages139    payload = {140        "page_size": 1,141        "filter": {142            'property': 'Reference ID',143            'rich_text': {"equals": ref_id},144        },145    }146    headers = {147        "Accept": "application/json",148        "Notion-Version": "2022-06-28",149        "Content-Type": "application/json",150        "Authorization": f"Bearer {NOTION_TOKEN}"151    }152    153    response = requests.post(url, json=payload, headers=headers)154    155    response = json.loads(response.text)156    #  pprint.pprint(response)157    try:158        if len(response['results']) > 0:159            return response['results'][0]['id']160    except:161        return -1162    return -1163def clean_str(string):164    string = string.strip()165    string = string.replace('\n', ' ')166    string = string.replace(r'\"a', 'ä')167    string = string.replace(r'\"e', 'ë')168    string = string.replace(r'\"i', 'ï')169    string = string.replace(r'\"o', 'ö')170    string = string.replace(r'\"u', 'ü')171    string = string.replace(r'\'a', 'á')172    string = string.replace(r'\'e', 'é')173    string = string.replace(r'\'i', 'Ã')174    string = string.replace(r'\'o', 'ó')175    string = string.replace(r'\'u', 'ú')176    string = string.replace(r'\^a', 'â')177    string = string.replace(r'\^e', 'ê')178    string = string.replace(r'\^i', 'î')179    string = string.replace(r'\^o', 'ô')180    string = string.replace(r'\^u', 'û')181    string = string.replace(r'\`a', 'à')182    string = string.replace(r'\`e', 'è')183    string = string.replace(r'\`i', 'ì')184    string = string.replace(r'\`o', 'ò')185    string = string.replace(r'\`u', 'ù')186    string = ' '.join([w.title() if w.islower() else w for w in string.split()])187    string = string.replace('{', '')188    string = string.replace('}', '')189    return string190def main():191    print("Instantiating the parse")192    # instantiate the parser193    parser = bibtexparser.bparser.BibTexParser()194    parser.ignore_nonstandard_types = True195    parser.homogenize_fields = False196    parser.interpolate_strings = False197    print("Opening .bib file")198    with open(BIB_PATH) as bib_file:199        bibliography = bibtexparser.load(bib_file, parser=parser)200    print("Opening archive file")201    if os.path.exists(ARCHIVE_PATH):202        with open(ARCHIVE_PATH, 'rb') as archive_file:203            archive = pickle.load(archive_file)204    else:205        archive = []206    archive_ids = [e['ID'] for e in archive]207    print("Adding entries to notion")208    # add each entry to notion database209    update_archive = False210    for entry in reversed(bibliography.entries):211        title = entry.get('title', '')212        title = clean_str(title)213        title = title214        authors = entry.get('author', '')215        authors = authors.replace(' and ', '; ')216        authors = clean_str(authors)217        authors = authors if len(authors) < 100 else authors[:100]218        219        year = entry.get('year', '')220        link = entry.get('url', None)221        ref_id = entry.get('ID')222        if ref_id not in archive_ids: # new page223            notion_add_entry(224                title=title,225                authors=authors,226                year=year,227                ref_id=ref_id,228                link=link,229            )230            update_archive = True231        elif entry not in archive: # update existing page232            page_id = notion_fetch_page(ref_id)233            if page_id != -1:234                notion_update_page(235                    page_id=page_id,236                    title=title,237                    authors=authors,238                    year=year,239                    ref_id=ref_id,240                    link=link,241                )242                update_archive = True243    # only update the archive if necessary244    if update_archive:245        with open(ARCHIVE_PATH, 'wb') as archive_file:246            archive = pickle.dump(bibliography.entries, archive_file)247    else:248        print("Did not update archive")249if __name__ == "__main__":...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
