How to use update_archive method in localstack

Best Python code snippet using localstack_python

deltamc.py

Source:deltamc.py Github

copy

Full Screen

...100if (gui):101 tkinst = Gui(root)102transfer_tkinst()103cprint("You are running " + sys.platform)104update_archive(True)105if(gui):106 delta_gui.updateinst()107 tkinst.update_modlist()108check_for_updates()109cprint("DeltaMC v"+version)110instance = delta_gui.instance111if (args.instance != "None"):112 instance = args.instance113if(instance == "@ERROR@"):114 instance = delta_gui.instance115cprint("Selected Instance: "+instance)116cprint("Minecraft Version: "+mc_version)117upgradesavailable = delta_upgrade.get_upgrades(instance)118if (upgradesavailable == []):119 pass120else:121 cprint("The following upgrades are available for instance "+instance+":")122 for upgrade in upgradesavailable:123 cprint(" "+upgrade[0].name+" (current version: "+upgrade[1].versions[0]["Version"]+", you have: "+upgrade[0].versions[0]["Version"]+")")124if (args.install != "None"):125 delta_install.install_mod(args.install)126 sys.exit()127if (args.remove != "None"):128 delta_remove.remove_mod(args.remove)129 sys.exit()130if (args.upgrade != "None"):131 delta_upgrade.upgrade_mod(args.upgrade)132 sys.exit()133if (args.info != "None"):134 get_info(args.info, output=True)135 sys.exit()136if (args.export != "None"):137 delta_importexport.export_mods(args.export)138 sys.exit()139if (args.importa != "None"):140 delta_importexport.import_mods(args.importa)141 sys.exit()142if (gui == False):143 print_help()144def parsecmd(command):145 if(command.split(" ")[0] == "update"):146 update_archive()147 elif(command.split(" ")[0] == "upgrades"):148 if(len(command.split(" ")) == 2 and command.split(" ")[1] != ""):149 inst = command.split(" ")[1]150 if(inst == "*"):151 inst = None152 if (not instance_exists(inst) and inst != None):153 cprint("Instance "+inst+" does not exist.")154 return155 update_archive()156 delta_upgrade.check_upgrades(True, inst)157 elif(len(command.split(" ")) == 1):158 inst = input("Enter instance name: ")159 if(inst == "*"):160 inst = None161 if (not instance_exists(inst) and inst != None):162 cprint("Instance "+inst+" does not exist.")163 return164 update_archive()165 delta_upgrade.check_upgrades(True, inst)166 else:167 cprint("Invalid command syntax.")168 elif(command.split(" ")[0] == "upgrade"):169 if(len(command.split(" ")) == 2 and command.split(" ")[1] != ""):170 mod = command.split(" ")[1]171 update_archive()172 upgrades = delta_upgrade.get_upgrades()173 delta_upgrade.upgrade_mod(mod)174 elif(len(command.split(" ")) == 1):175 mod = None176 update_archive()177 upgrades = delta_upgrade.get_upgrades()178 delta_upgrade.upgrade_mod(mod)179 else:180 cprint("Invalid command syntax.")181 elif(command.split(" ")[0] == "upgradeall"):182 if(len(command.split(" ")) == 2 and command.split(" ")[1] != ""):183 inst = command.split(" ")[1]184 if(inst == "*"):185 inst = None186 if (not instance_exists(inst) and inst != None):187 cprint("Instance "+inst+" does not exist.")188 return189 elif(len(command.split(" ")) == 1):190 inst = input("Enter instance name: ")191 if(inst == "*"):192 inst = None193 if (not instance_exists(inst) and inst != None):194 cprint("Instance "+inst+" does not exist.")195 return196 else:197 cprint("Invalid command syntax.")198 update_archive()199 updates = delta_upgrade.get_upgrades(inst)200 if(len(updates) == 0):201 cprint("No upgrades available.")202 else:203 for update in updates:204 delta_upgrade.upgrade_mod(update[0]["Name"])205 elif(command.split(" ")[0] == "install"):206 if(len(command.split(" ")) == 3 and command.split(" ")[2] != ""):207 mod = command.split(" ")[1]208 mod_version = command.split(" ")[2]209 update_archive()210 delta_install.install_mod(mod, version)211 elif(len(command.split(" ")) == 2 and command.split(" ")[1] != ""):212 mod = command.split(" ")[1]213 update_archive()214 delta_install.install_mod(mod)215 elif(len(command.split(" ")) == 1):216 mod = None217 update_archive()218 delta_install.install_mod(mod)219 else:220 cprint("Invalid command syntax.")221 cprint(len(command.split(" ")))222 elif(command.split(" ")[0] == "remove"):223 if(len(command.split(" ")) == 2 and command.split(" ")[1] != ""):224 mod = command.split(" ")[1]225 delta_remove.remove_mod(mod)226 elif(len(command.split(" ")) == 1):227 mod = None228 delta_remove.remove_mod(mod)229 else:230 cprint("Invalid command syntax.")231 elif(command.split(" ")[0] == "info"):232 if(len(command.split(" ")) == 2 and command.split(" ")[1] != ""):233 mod = command.split(" ")[1]234 get_info(mod, output=True)235 elif(len(command.split(" ")) == 1):236 mod = None237 get_info(mod, output=True)238 elif(command.split(" ")[0] == "installm" or command.split(" ")[0] == "installmany"):239 if(len(command.split(" ")) >= 2):240 modslist = command.split(" ")[1:] # separate mod names with spaces241 update_archive()242 string = "Attempting to install: "243 for item in modslist:244 string = string + item+", "245 cprint(string[:-2]+"...") # [:-2] to cut off the extra ", " after the last element246 for item in modslist:247 delta_install.install_mod(item)248 else:249 cprint("Invalid command syntax.")250 elif(command.split(" ")[0] == "removem" or command.split(" ")[0] == "removemany"):251 if(len(command.split(" ")) >= 2):252 modslist = command.split(" ")[1:] # separate mod names with spaces253 update_archive()254 string = "Attempting to remove: "255 for item in modslist:256 string = string + item+", "257 cprint(string[:-2]+"...") # [:-2] to cut off the extra ", " after the last element258 for item in modslist:259 delta_remove.remove_mod(item)260 else:261 cprint("Invalid command syntax.")262 elif(command.split(" ")[0] == "upgradem" or command.split(" ")[0] == "upgrademany"):263 if(len(command.split(" ")) >= 2):264 modslist = command.split(" ")[1:] # separate mod names with spaces265 update_archive()266 string = "Attempting to upgrade: "267 for item in modslist:268 string = string + item+", "269 cprint(string[:-2]+"...") # [:-2] to cut off the extra ", " after the last element270 for item in modslist:271 delta_upgrade.upgrade_mod(item)272 else:273 cprint("Invalid command syntax.")274 elif(command.split(" ")[0] == "export"):275 if(len(command.split(" ")) == 2 and command.split(" ")[1] != ""):276 name = command.split(" ")[1]277 update_archive()278 delta_importexport.export_mods(name)279 elif(len(command.split(" ")) == 1):280 name = None281 update_archive()282 delta_importexport.export_mods(name)283 else:284 cprint("Invalid command syntax.")285 elif(command.split(" ")[0] == "import"):286 if(len(command.split(" ")) == 2 and command.split(" ")[1] != ""):287 path = command.split(" ")[1]288 update_archive()289 delta_importexport.import_mods(path)290 elif(len(command.split(" ")) == 1):291 path = None292 update_archive()293 delta_importexport.import_mods(path)294 else:295 cprint("Invalid command syntax.")296 elif(command.split(" ")[0] == "instance" or command.split(" ")[0] == "inst"):297 if(len(command.split(" ")) == 2 and command.split(" ")[1] != ""):298 name = command.split(" ")[1]299 if(instance_exists(name)):300 if(name == instance):301 cprint("Instance "+name+" already selected!")302 else:303 setup_config(name)304 cprint("Switched to instance "+name+".")305 else:306 cprint("Instance "+name+" does not exist.")...

Full Screen

Full Screen

core.py

Source:core.py Github

copy

Full Screen

1""" Base class implementation for core features present in the updater module.2This is the updater core class which provides all facilities other implementation should rely in.3This class should not be used directly, use a derived class instead.4"""5import contextlib6import io7import os8import platform9import zipfile10import logging11import json12import urllib.request13from pubsub import pub # type: ignore14from typing import Optional, Dict, Tuple, Union, Any15from . import paths16log = logging.getLogger("updater.core")17class UpdaterCore(object):18 """ Base class for all updater implementations.19 Implementations must add user interaction methods and call logic for all methods present in this class.20 """21 def __init__(self, endpoint: str, current_version: str, app_name: str = "", password: Optional[bytes] = None) -> None:22 """ 23 :param endpoint: The URl endpoint where the module should retrieve update information. It must return a json valid response or a non 200 HTTP status code.24 :type endpoint: str25 :param current_version: Application's current version.26 :type current_version: str27 :param app_name: Name of the application.28 (default is empty)29 :type app_name: str30 :param password: Password for update zipfile.31 :type password: bytes32 """33 self.endpoint = endpoint34 self.current_version = current_version35 self.app_name = app_name36 self.password = password37 def get_update_information(self) -> Dict[str, Any]:38 """ Calls the provided URL endpoint and returns information about the available update sent by the server. The format should adhere to the json specifications for updates.39 If the server returns a status code different to 200 or the json file is not valid, this will raise either a :py:exc:`urllib.error.HTTPError` or a :external:py:exc:`json.JSONDecodeError`.40 :rtype: dict41 """42 response = urllib.request.urlopen(self.endpoint)43 data: str = response.read()44 content: Dict[str, Any] = json.loads(data)45 return content46 def get_version_data(self, content: Dict[str, Any]) -> Tuple[Union[bool, str], Union[bool, str], Union[bool, str]]:47 """ Parses the dictionary returned by :py:func:`updater.core.updaterCore.get_update_information` and, if there is a new update available, returns information about it in a tuple.48 the module checks whether :py:attr:`updater.core.updaterCore.current_version` is different to the version reported in the update file, and the json specification file contains a binary link for the user's architecture. If both of these conditions are True, a tuple is returned with (new_version, update_description, update_url).49 If there is no update available, a tuple with Falsy values is returned.50 This method can raise a KeyError if there are no updates for the current architecture defined in the update file.51 :returns: tuple with update information or False values.52 :rtype: tuple53 """54 available_version = content["current_version"]55 update_url_key = platform.system()+platform.architecture()[0][:2]56 if available_version == self.current_version:57 return (False, False, False)58 if content["downloads"].get(update_url_key) == None:59 log.error("Update file doesn't include architecture {}".format(update_url_key))60 raise KeyError("Update file doesn't include current architecture.")61 available_description = content["description"]62 update_url = content ['downloads'][update_url_key]63 return (available_version, available_description, update_url)64 def download_update(self, update_url: str, update_destination: str, chunk_size: int = io.DEFAULT_BUFFER_SIZE) -> str:65 """ Downloads an update URL and notifies all subscribers of the download progress.66 This function will send a pubsub notification every time the download progress updates by using :py:func:`pubsub.pub.sendMessage` under the topic "updater.update-progress".67 You might subscribe to this notification by using :py:func:`pubsub.pub.subscribe` with a function with this signature:68 ``def receive_progress(total_downloaded: int, total_size: int):``69 In this function, it is possible to update the UI progress bar if needed.70 Don't forget to call :py:func:`pubsub.pub.unsubscribe` at the end of the update.71 :param update_url: Direct link to update zip file.72 :type update_url: str73 :param update_destination: Destination path to save the update file74 :type update_destination: str75 :param chunk_size: chunk size for downloading the update (default to :py:data:`io.DEFAULT_BUFFER_SIZE`)76 :type chunk_size: int77 :returns: The update file path in the system.78 :rtype: str79 """80 def _download_callback(transferred_blocks, block_size, total_size):81 total_downloaded = transferred_blocks*block_size82 pub.sendMessage("updater.update-progress", total_downloaded=total_downloaded, total_size=total_size)83 filename, headers = urllib.request.urlretrieve(update_url, update_destination, _download_callback)84 log.debug("Update downloaded")85 return update_destination86 def extract_update(self, update_archive: str, destination: str) -> str:87 """ Given an update archive, extracts it. Returns the directory to which it has been extracted.88 :param update_archive: Path to the update file.89 :type update_archive: str90 :param destination: Path to extract the archive. User must have permission to do file operations on the path.91 :type destination: str92 :returns: Path where the archive has been extracted.93 :rtype: str94 """95 with contextlib.closing(zipfile.ZipFile(update_archive)) as archive:96 if self.password:97 archive.setpassword(self.password)98 archive.extractall(path=destination)99 log.debug("Update extracted")100 return destination101 def move_bootstrap(self, extracted_path: str) -> str:102 """ Moves the bootstrapper binary from the update extraction folder to a working path, so it will be able to perform operations under the update directory later.103 :param extracted_path: Path to which the update file has been extracted.104 :type extracted_path: str105 :returns: The path to the bootstrap binary to be run to actually perform the update.106 :rtype: str107 """108 working_path = os.path.abspath(os.path.join(extracted_path, '..'))109 if platform.system() == 'Darwin':110 extracted_path = os.path.join(extracted_path, 'Contents', 'Resources')111 downloaded_bootstrap = os.path.join(extracted_path, self.bootstrap_name())112 new_bootstrap_path = os.path.join(working_path, self.bootstrap_name())113 os.rename(downloaded_bootstrap, new_bootstrap_path)114 return new_bootstrap_path115 def execute_bootstrap(self, bootstrap_path: str, source_path: str) -> None:116 """ Executes the bootstrapper binary, which will move the files from the update directory to the app folder, finishing with the update process.117 :param bootstrap_path: Path to the bootstrap binary that will perform the update, as returned by :py:func:`move_bootstrap`118 :type bootstrap_path: str119 :param source_path: Path where the update file was extracted, as returned by :py:func:`extract_update`120 :type source_path: str121 """122 arguments = r'"%s" "%s" "%s" "%s"' % (os.getpid(), source_path, paths.app_path(), paths.get_executable())123 if platform.system() == 'Windows':124 import win32api # type: ignore125 win32api.ShellExecute(0, 'open', bootstrap_path, arguments, '', 5)126 else:127 import subprocess128 self.make_executable(bootstrap_path)129 subprocess.Popen(['%s %s' % (bootstrap_path, arguments)], shell=True)130 log.info("Bootstrap executed")131 def bootstrap_name(self) -> str:132 """ Returns the name of the bootstrapper, based in user platform.133 :rtype: str134 """135 if platform.system() == 'Windows':136 return 'bootstrap.exe'137 elif platform.system() == 'Darwin':138 return 'bootstrap-mac.sh'139 return 'bootstrap-lin.sh'140 def make_executable(self, path: str) -> None:141 """ Set execution permissions in a script on Unix platforms. """142 import stat143 st = os.stat(path)...

Full Screen

Full Screen

sync.py

Source:sync.py Github

copy

Full Screen

1import bibtexparser2import json3import os4import pickle5import pprint6import requests7"""8TODO:9- Default icon for papers.10"""11ARCHIVE_PATH = 'archive.pk'12BIB_PATH = 'references.bib'13NOTION_TOKEN = os.environ['NOTION_TOKEN']14DATABASE_IDENTIFIER = os.environ['DATABASE_IDENTIFIER']15def notion_add_entry(16 title='',17 authors='',18 year='0',19 ref_id='',20 link='',21):22 url = "https://api.notion.com/v1/pages"23 payload = {24 "parent": {25 'database_id': DATABASE_IDENTIFIER,26 },27 "properties": {28 'Title': {29 'title': [{30 'text': {31 'content': title,32 }33 }]34 },35 'Authors': {36 "rich_text": [{37 "type": "text",38 "text": {39 "content": authors,40 }41 }],42 },43 'Year': {44 "rich_text": [{45 "type": "text",46 "text": {47 "content": year,48 }49 }],50 },51 'Reference ID': {52 "rich_text": [{53 "type": "text",54 "text": {55 "content": ref_id,56 }57 }],58 },59 'Link': {60 "url": link,61 },62 63 64 },65 }66 # pprint.pprint(payload)67 headers = {68 "Accept": "application/json",69 "Notion-Version": "2022-06-28",70 "Content-Type": "application/json",71 "Authorization": f"Bearer {NOTION_TOKEN}"72 }73 response = requests.post(url, json=payload, headers=headers)74 # pprint.pprint(json.loads(response.text))75def notion_update_page(76 page_id,77 title='',78 authors='',79 year='0',80 ref_id='',81 link='',82):83 url = f"https://api.notion.com/v1/pages/{page_id}"84 payload = {85 "parent": {86 'database_id': DATABASE_IDENTIFIER,87 },88 "properties": {89 'Title': {90 'title': [{91 'text': {92 'content': title,93 }94 }]95 },96 'Authors': {97 "rich_text": [{98 "type": "text",99 "text": {100 "content": authors,101 }102 }],103 },104 'Year': {105 "rich_text": [{106 "type": "text",107 "text": {108 "content": year,109 }110 }],111 },112 'Reference ID': {113 "rich_text": [{114 "type": "text",115 "text": {116 "content": ref_id,117 }118 }],119 },120 'Link': {121 "url": link,122 },123 124 125 },126 }127 # pprint.pprint(payload)128 headers = {129 "Accept": "application/json",130 "Notion-Version": "2022-06-28",131 "Content-Type": "application/json",132 "Authorization": f"Bearer {NOTION_TOKEN}"133 }134 response = requests.patch(url, json=payload, headers=headers)135 print(response)136def notion_fetch_page(ref_id):137 url = f"https://api.notion.com/v1/databases/{DATABASE_IDENTIFIER}/query"138 # list database pages139 payload = {140 "page_size": 1,141 "filter": {142 'property': 'Reference ID',143 'rich_text': {"equals": ref_id},144 },145 }146 headers = {147 "Accept": "application/json",148 "Notion-Version": "2022-06-28",149 "Content-Type": "application/json",150 "Authorization": f"Bearer {NOTION_TOKEN}"151 }152 153 response = requests.post(url, json=payload, headers=headers)154 155 response = json.loads(response.text)156 # pprint.pprint(response)157 try:158 if len(response['results']) > 0:159 return response['results'][0]['id']160 except:161 return -1162 return -1163def clean_str(string):164 string = string.strip()165 string = string.replace('\n', ' ')166 string = string.replace(r'\"a', 'ä')167 string = string.replace(r'\"e', 'ë')168 string = string.replace(r'\"i', 'ï')169 string = string.replace(r'\"o', 'ö')170 string = string.replace(r'\"u', 'ü')171 string = string.replace(r'\'a', 'á')172 string = string.replace(r'\'e', 'é')173 string = string.replace(r'\'i', 'í')174 string = string.replace(r'\'o', 'ó')175 string = string.replace(r'\'u', 'ú')176 string = string.replace(r'\^a', 'â')177 string = string.replace(r'\^e', 'ê')178 string = string.replace(r'\^i', 'î')179 string = string.replace(r'\^o', 'ô')180 string = string.replace(r'\^u', 'û')181 string = string.replace(r'\`a', 'à')182 string = string.replace(r'\`e', 'è')183 string = string.replace(r'\`i', 'ì')184 string = string.replace(r'\`o', 'ò')185 string = string.replace(r'\`u', 'ù')186 string = ' '.join([w.title() if w.islower() else w for w in string.split()])187 string = string.replace('{', '')188 string = string.replace('}', '')189 return string190def main():191 print("Instantiating the parse")192 # instantiate the parser193 parser = bibtexparser.bparser.BibTexParser()194 parser.ignore_nonstandard_types = True195 parser.homogenize_fields = False196 parser.interpolate_strings = False197 print("Opening .bib file")198 with open(BIB_PATH) as bib_file:199 bibliography = bibtexparser.load(bib_file, parser=parser)200 print("Opening archive file")201 if os.path.exists(ARCHIVE_PATH):202 with open(ARCHIVE_PATH, 'rb') as archive_file:203 archive = pickle.load(archive_file)204 else:205 archive = []206 archive_ids = [e['ID'] for e in archive]207 print("Adding entries to notion")208 # add each entry to notion database209 update_archive = False210 for entry in reversed(bibliography.entries):211 title = entry.get('title', '')212 title = clean_str(title)213 title = title214 authors = entry.get('author', '')215 authors = authors.replace(' and ', '; ')216 authors = clean_str(authors)217 authors = authors if len(authors) < 100 else authors[:100]218 219 year = entry.get('year', '')220 link = entry.get('url', None)221 ref_id = entry.get('ID')222 if ref_id not in archive_ids: # new page223 notion_add_entry(224 title=title,225 authors=authors,226 year=year,227 ref_id=ref_id,228 link=link,229 )230 update_archive = True231 elif entry not in archive: # update existing page232 page_id = notion_fetch_page(ref_id)233 if page_id != -1:234 notion_update_page(235 page_id=page_id,236 title=title,237 authors=authors,238 year=year,239 ref_id=ref_id,240 link=link,241 )242 update_archive = True243 # only update the archive if necessary244 if update_archive:245 with open(ARCHIVE_PATH, 'wb') as archive_file:246 archive = pickle.dump(bibliography.entries, archive_file)247 else:248 print("Did not update archive")249if __name__ == "__main__":...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful