Best Python code snippet using refurb_python
adobe_activate_api.py
Source:adobe_activate_api.py  
...34def save_settings(settings):35    settings_file = os.path.join(ADDON_PATH_PROFILE, SETTINGS_FILE)36    with open(settings_file, 'w') as fp:37        json.dump(settings, fp, sort_keys=False, indent=4)38def load_settings():39    settings_file = os.path.join(ADDON_PATH_PROFILE, SETTINGS_FILE)40    if not os.path.isfile(settings_file):41        save_settings(dict())42    with open(settings_file, 'r') as fp:43        return json.load(fp)44def get_device_id():45    settings = load_settings()46    if 'device_id' not in settings:47        settings['device_id'] = str(uuid.uuid1())48        save_settings(settings)49    return settings['device_id']50def read_response(resp):51    if resp.info().get('Content-Encoding') == 'gzip':52        buf = StringIO(resp.read())53        f = gzip.GzipFile(fileobj=buf)54        content = f.read()55    else:56        content = resp.read()57    return json.loads(content)58def is_expired(expiration):59    return (time.time() * 1000) >= int(expiration)60def get_url_response(url, message, body = None, method = None):61    # xbmc.log(TAG + 'url %s message %s' % (url, message), xbmc.LOGDEBUG)62    cj = get_cookie_jar()63    opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))64    opener.addheaders = [ ("Accept", "application/json"),65                            ("Accept-Encoding", "gzip, deflate"),66                            ("Accept-Language", "en-us"),67                            ("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8"),68                            ("Connection", "close"),69                            ("User-Agent", UA_ATV),70                            ("Authorization", message)]71    if method == 'DELETE':72        request = urllib2.Request(url)73        request.get_method = lambda: method74        resp = opener.open(request)75    else:76        resp = opener.open(url, body)77        resp = read_response(resp)78    save_cookies(cj)79    return resp80def generate_message(method, path):81    nonce = str(uuid.uuid4())82    today = str(int(time.time() * 1000))83    key = 'gB8HYdEPyezeYbR1'84    message = method + ' requestor_id=ESPN, nonce=' + nonce + ', signature_method=HMAC-SHA1, request_time=' + today + ', request_uri=' + path85    signature = hmac.new(key, message, hashlib.sha1)86    signature = base64.b64encode(signature.digest())87    message = message + ', public_key=yKpsHYd8TOITdTMJHmkJOVmgbb2DykNK, signature=' + signature88    return message89def is_reg_code_valid():90    settings = load_settings()91    if 'generateRegCode' not in settings:92        xbmc.log(TAG + 'Unable to find reg code', xbmc.LOGDEBUG)93        return False94    # Check code isn't expired95    expiration = settings['generateRegCode']['expires']96    if is_expired(expiration):97        xbmc.log(TAG + 'Reg code is expired at %s' % expiration, xbmc.LOGDEBUG)98        return False99    return True100# Gets called when the user wants to authorize this device, it returns a registration code to enter101# on the activation website page102# Sample : '{"id":"","code":"","requestor":"ESPN","generated":1463616806831,103# "expires":1463618606831,"info":{"deviceId":"","deviceType":"appletv","deviceUser":null,104# "appId":null,"appVersion":null,"registrationURL":null}}'105# (generateRegCode)106def get_regcode():107    if is_reg_code_valid():108        xbmc.log(TAG + 'Loading reg code from cache', xbmc.LOGDEBUG)109        return load_settings()['generateRegCode']['code']110    params = urllib.urlencode(111        {'deviceId': get_device_id(),112         'deviceType': 'appletv',113         'ttl': '1800'})114    path = '/regcode'115    url = urlparse.urlunsplit(['https', 'api.auth.adobe.com',116                               'reggie/v1/ESPN' + path,117                               params, ''])118    message = generate_message('POST', path)119    resp = get_url_response(url, message, dict())120    settings = load_settings()121    settings['generateRegCode'] = resp122    save_settings(settings)123    return resp['code']124# Authenticates the user after they have been authenticated on the activation website (authenticateRegCode)125# Sample: '{"mvpd":"","requestor":"ESPN","userId":"","expires":"1466208969000"}'126def authenticate():127    if not is_reg_code_valid():128        xbmc.log(TAG + 'reg code is invalid', xbmc.LOGDEBUG)129        raise ValueError('Registration code is invalid, please restart the authentication process')130    reg_code = get_regcode()131    params = urllib.urlencode({'requestor': 'ESPN'})132    path = '/authenticate/' + reg_code133    url = urlparse.urlunsplit(['https', 'api.auth.adobe.com',134                                   'api/v1' + path,135                                   params, ''])136    message = generate_message('GET', path)137    resp = get_url_response(url, message)138    settings = load_settings()139    settings['authenticateRegCode'] = resp140    save_settings(settings)141# Get authn token (re-auth device after it expires), getAuthnToken142def re_authenticate():143    params = urllib.urlencode({'requestor': 'ESPN',144                               'deviceId': get_device_id()})145    path = '/tokens/authn'146    url = urlparse.urlunsplit(['https', 'api.auth.adobe.com',147                                   'api/v1' + path,148                                   params, ''])149    message = generate_message('GET', path)150    resp = get_url_response(url, message)151    settings = load_settings()152    settings['authenticateRegCode'] = resp153    if 'authorize' in settings:154        del settings['authorize']155    save_settings(settings)156def get_resource(channel, event_name, event_guid, event_parental_rating):157    return '<rss version="2.0" xmlns:media="http://search.yahoo.com/mrss/"><channel><title><![CDATA[' + channel + "]]></title><item><title><![CDATA[" + event_name + "]]></title><guid><![CDATA[" + event_guid + ']]></guid><media:rating scheme="urn:v-chip"><![CDATA[' + event_parental_rating + "]]></media:rating></item></channel></rss>"158# Sample '{"resource":"resource","mvpd":"","requestor":"ESPN","expires":"1463621239000"}'159def authorize(resource):160    if is_authorized(resource):161        xbmc.log(TAG + 'already authorized', xbmc.LOGDEBUG)162        return163    params = urllib.urlencode({'requestor': 'ESPN',164                               'deviceId': get_device_id(),165                               'resource': resource})166    path = '/authorize'167    url = urlparse.urlunsplit(['https', 'api.auth.adobe.com',168                                   'api/v1' + path,169                                   params, ''])170    message = generate_message('GET', path)171    resp = get_url_response(url, message)172    settings = load_settings()173    if 'authorize' not in settings:174        settings['authorize'] = dict()175    xbmc.log(TAG + 'resource %s' % resource, xbmc.LOGDEBUG)176    settings['authorize'][resource.decode('iso-8859-1').encode('utf-8')] = resp177    save_settings(settings)178def deauthorize():179    params = urllib.urlencode({'deviceId': get_device_id()})180    path = '/logout'181    url = urlparse.urlunsplit(['https', 'api.auth.adobe.com',182                               'api/v1' + path,183                               params, ''])184    message = generate_message('DELETE', path)185    resp = get_url_response(url, message, body = None, method = 'DELETE')186    settings = load_settings()187    if 'authorize' in settings:188        del settings['authorize']189    if 'authenticateRegCode' in settings:190        del settings['authenticateRegCode']191    save_settings(settings)192# getShortMediaToken193# Sample '{"mvpdId":"","expires":"1463618218000","serializedToken":"+++++++=","userId":"",194# "requestor":"ESPN","resource":" resource"}'195def get_short_media_token(resource):196    if has_to_reauthenticate():197        xbmc.log(TAG + 're-authenticating device', xbmc.LOGDEBUG)198        re_authenticate()199    authorize(resource)200    params = urllib.urlencode({'requestor': 'ESPN',201                               'deviceId' : get_device_id(),202                               'resource' : resource})203    path = '/mediatoken'204    url = urlparse.urlunsplit(['https', 'api.auth.adobe.com',205                                   'api/v1' + path,206                                   params, ''])207    message = generate_message('GET', path)208    try:209        resp = get_url_response(url, message)210    except urllib2.HTTPError as exception:211        if exception.code == 401:212            xbmc.log(TAG + 'Unauthorized exception, trying again', xbmc.LOGDEBUG)213            re_authenticate()214            resp = get_url_response(url, message)215        else:216            raise exception217    settings = load_settings()218    settings['getShortMediaToken'] = resp219    save_settings(settings)220    return resp['serializedToken']221def is_authenticated():222    settings = load_settings()223    return 'authenticateRegCode' in settings224def has_to_reauthenticate():225    settings = load_settings()226    return is_expired(settings['authenticateRegCode']['expires'])227def is_authorized(resource):228    settings = load_settings()229    if 'authorize' in settings and resource.decode('iso-8859-1').encode('utf-8') in settings['authorize']:230        return not is_expired(settings['authorize'][resource.decode('iso-8859-1').encode('utf-8')]['expires'])231def get_expires_time(key):232    settings = load_settings()233    expires = settings[key]['expires']234    expires_time = time.localtime(int(expires) / 1000)235    return time.strftime('%Y-%m-%d %H:%M', expires_time)236def get_authentication_expires():237    return get_expires_time('authenticateRegCode')238def get_authorization_expires():239    return get_expires_time('authorize')240def clean_up_authorization_tokens():241    settings = load_settings()242    keys_to_delete = list()243    if 'authorize' in settings:244        for key in settings['authorize']:245            if 'expires' in settings['authorize'][key]:246                if is_expired(settings['authorize'][key]['expires']):247                    keys_to_delete.append(key)248            else:249                keys_to_delete.append(key)250    for key in keys_to_delete:251        del settings['authorize'][key]252    save_settings(settings)253def get_user_metadata():254    params = urllib.urlencode({'requestor': 'ESPN',255                               'deviceId' : get_device_id()})...prompt.py
Source:prompt.py  
...34    """Prompt is used for prompting users for input used in deploying Gluu.35    """36    def __init__(self):37        self.settings = SettingsHandler()38    def load_settings(self):39        self.settings = SettingsHandler()40    def license(self):41        self.load_settings()42        PromptLicense(self.settings)43    def versions(self):44        self.load_settings()45        PromptVersion(self.settings)46    def arch(self):47        self.load_settings()48        arch = PromptArch(self.settings)49        arch.prompt_arch()50    def namespace(self):51        self.load_settings()52        namespace = PromptNamespace(self.settings)53        namespace.prompt_gluu_namespace()54    def optional_services(self):55        self.load_settings()56        optional_services = PromptOptionalServices(self.settings)57        optional_services.prompt_optional_services()58    def jackrabbit(self):59        self.load_settings()60        jackrabbit = PromptJackrabbit(self.settings)61        jackrabbit.prompt_jackrabbit()62    def istio(self):63        self.load_settings()64        istio = PromptIstio(self.settings)65        istio.prompt_istio()66    def test_enviornment(self):67        self.load_settings()68        test_environment = PromptTestEnvironment(self.settings)69        if not self.settings.get("TEST_ENVIRONMENT") and \70                self.settings.get("DEPLOYMENT_ARCH") not in ("microk8s", "minikube"):71            test_environment.prompt_test_environment()72        if self.settings.get("DEPLOYMENT_ARCH") in ("eks", "gke", "do", "local", "aks"):73            if not self.settings.get("NODE_SSH_KEY"):74                test_environment.prompt_ssh_key()75    def network(self):76        if not self.settings.get("HOST_EXT_IP"):77            ip = gather_ip()78            self.load_settings()79            self.settings.set("HOST_EXT_IP", ip)80            if self.settings.get("DEPLOYMENT_ARCH") == "eks" and self.settings.get("USE_ISTIO_INGRESS") != "Y":81                aws = PromptAws(self.settings)82                aws.prompt_aws_lb()83    def gke(self):84        self.load_settings()85        if self.settings.get("DEPLOYMENT_ARCH") == "gke":86            gke = PromptGke(self.settings)87            gke.prompt_gke()88    def persistence_backend(self):89        self.load_settings()90        persistence_backend = PromptPersistenceBackend(self.settings)91        persistence_backend.prompt_persistence_backend()92    def ldap(self):93        self.load_settings()94        if self.settings.get("PERSISTENCE_BACKEND") == "hybrid":95            ldap = PromptLdap(self.settings)96            ldap.prompt_hybrid_ldap_held_data()97    def volumes(self):98        self.load_settings()99        volumes = PromptVolumes(self.settings)100        if self.settings.get("PERSISTENCE_BACKEND") in ("hybrid", "ldap") or \101                self.settings.get("INSTALL_JACKRABBIT") == "Y":102            volumes.prompt_volumes()103        volumes.prompt_storage()104    def couchbase(self):105        self.load_settings()106        couchbase = PromptCouchbase(self.settings)107        if not self.settings.get("DEPLOY_MULTI_CLUSTER") and self.settings.get("PERSISTENCE_BACKEND") in (108                "hybrid", "couchbase") and self.settings.get("DEPLOYMENT_ARCH") not in ("microk8s", "minikube"):109            couchbase.prompt_couchbase_multi_cluster()110        if self.settings.get("PERSISTENCE_BACKEND") in ("hybrid", "couchbase"):111            couchbase.prompt_couchbase()112    def cache(self):113        self.load_settings()114        cache = PromptCache(self.settings)115        cache.prompt_cache_type()116    def backup(self):117        self.load_settings()118        if self.settings.get("DEPLOYMENT_ARCH") not in ("microk8s", "minikube"):119            backup = PromptBackup(self.settings)120            backup.prompt_backup()121    def configuration(self):122        self.load_settings()123        configuration = PromptConfiguration(self.settings)124        configuration.prompt_config()125    def images(self):126        self.load_settings()127        images = PromptImages(self.settings)128        images.prompt_image_name_tag()129    def replicas(self):130        self.load_settings()131        replicas = PromptReplicas(self.settings)132        replicas.prompt_replicas()133    def sql(self):134        self.load_settings()135        if self.settings.get("PERSISTENCE_BACKEND") == "sql":136            spanner = PromptSQL(self.settings)137            spanner.prompt_sql()138            139    def google(self):140        self.load_settings()141        if self.settings.get("PERSISTENCE_BACKEND") == "spanner":142            spanner = PromptGoogle(self.settings)143            spanner.prompt_google()144    def confirm_settings(self):145        self.load_settings()146        if self.settings.get("CONFIRM_PARAMS") != "Y":147            confirm_settings = PromptConfirmSettings(self.settings)148            confirm_settings.confirm_params()149    def prompt(self):150        """Main property: called to setup all prompts and returns prompts in settings file.151        :return:152        """153        self.license()154        self.versions()155        self.arch()156        self.namespace()157        self.optional_services()158        self.jackrabbit()159        self.istio()...Package Syncing.py
Source:Package Syncing.py  
...12log = logger.getLogger(__name__)13q = thread.Queue()14class PkgSyncEnableCommand(sublime_plugin.WindowCommand):15    def is_enabled(self):16        s = tools.load_settings()17        return not s.get("sync", False)18    def run(self):19        s = sublime.load_settings("Package Syncing.sublime-settings")20        s.set("sync", True)21        sublime.save_settings("Package Syncing.sublime-settings")22        # Start watcher23        tools.start_watcher(tools.load_settings())24        # Run pkg_sync25        sublime.run_command("pkg_sync", {"mode": ["pull", "push"]})26class PkgSyncDisableCommand(sublime_plugin.WindowCommand):27    def is_enabled(self):28        s = tools.load_settings()29        return s.get("sync", False)30    def run(self):31        s = sublime.load_settings("Package Syncing.sublime-settings")32        s.set("sync", False)33        sublime.save_settings("Package Syncing.sublime-settings")34        # Stop watcher35        tools.stop_watcher()36class PkgSyncCommand(sublime_plugin.ApplicationCommand):37    def is_enabled(self):38        s = tools.load_settings()39        return s.get("sync", False) and s.get("sync_folder", False) != False40    def run(self, mode=["pull", "push"], override=False):41        log.debug("pkg_sync %s %s", mode, override)42        # Load settings43        settings = sublime.load_settings("Package Syncing.sublime-settings")44        # Check for valid sync_folder45        if not os.path.isdir(settings.get("sync_folder")):46            sublime.error_message("Invalid sync folder \"%s\", sync disabled! Please adjust your sync folder." % settings.get("sync_folder"))47            settings.set("sync", False)48            sublime.save_settings("Package Syncing.sublime-settings")49            return50        # Check if sync is already running51        if not q.has("sync"):52            t = thread.Sync(tools.load_settings(), mode, override)53            q.add(t, "sync")54        else:55            print("Package Syncing: Already running")56class PkgSyncPullItemCommand(sublime_plugin.ApplicationCommand):57    def is_enabled(self):58        s = tools.load_settings()59        return s.get("sync", False) and s.get("sync_folder", False) and os.path.isdir(s.get("sync_folder"))60    def run(self, item):61        log.debug("pkg_sync_pull_item %s", item)62        # Start a thread to pull the current item63        t = thread.Sync(tools.load_settings(), mode=["pull"], item=item)64        q.add(t)65class PkgSyncPushItemCommand(sublime_plugin.ApplicationCommand):66    def is_enabled(self):67        s = tools.load_settings()68        return s.get("sync", False) and s.get("sync_folder", False) and os.path.isdir(s.get("sync_folder"))69    def run(self, item):70        log.debug("pkg_sync_push_item %s", item)71        # Start a thread to push the current item72        t = thread.Sync(tools.load_settings(), mode=["push"], item=item)73        q.add(t)74class PkgSyncFolderCommand(sublime_plugin.WindowCommand):75    def is_enabled(self):76        return not q.has("sync")77    def run(self):78        # Load settings to provide an initial value for the input panel79        settings = sublime.load_settings("Package Syncing.sublime-settings")80        settings.clear_on_change("package_syncing")81        sublime.save_settings("Package Syncing.sublime-settings")82        sync_folder = settings.get("sync_folder")83        # Suggest user dir if nothing set or folder do not exists84        if not sync_folder:85            sync_folder = os.path.expanduser("~")86        def on_done(path):87            if not os.path.isdir(path):88                os.makedirs(path)89            if os.path.isdir(path):90                if os.listdir(path):91                    if sublime.ok_cancel_dialog("The selected folder is not empty, would you like to continue and override your local settings?", "Continue"):92                        override = True93                    else:94                        self.window.show_input_panel("Sync Folder", path, on_done, None, None)95                        return96                else:97                    override = False98                # Adjust settings99                settings.set("sync", True)100                settings.set("sync_folder", path)101                # Reset last-run file102                file_path = os.path.join(sublime.packages_path(), "User", "Package Control.last-run")103                if os.path.isfile(file_path):104                    os.remove(file_path)105                # Reset last-run file106                file_path = os.path.join(sublime.packages_path(), "User", "Package Syncing.last-run")107                if os.path.isfile(file_path):108                    os.remove(file_path)109                sublime.save_settings("Package Syncing.sublime-settings")110                sublime.status_message("sync_folder successfully set to \"%s\"" % path)111                # Restart watcher112                tools.pause_watcher(local=False)113                tools.stop_watcher(local=False)114                tools.start_watcher(tools.load_settings(), local=False)115                # Run pkg_sync116                sublime.set_timeout(lambda: sublime.run_command("pkg_sync", {"mode": ["pull", "push"], "override": override}), 1000)117            else:118                sublime.error_message("Invalid Path %s" % path)119            # Add on on_change listener120            sublime.set_timeout(lambda: settings.add_on_change("package_syncing", tools.restart_watcher), 500)121        self.window.show_input_panel("Sync Folder", sync_folder, on_done, None, None)122def plugin_loaded():123    s = sublime.load_settings("Package Syncing.sublime-settings")124    s.clear_on_change("package_syncing")125    s.add_on_change("package_syncing", tools.restart_watcher)126    sublime.save_settings("Package Syncing.sublime-settings")127    # Start watcher128    sublime.set_timeout(lambda: tools.start_watcher(tools.load_settings()), 100)129    # Run pkg_sync130    sublime.set_timeout(lambda: sublime.run_command("pkg_sync", {"mode": ["pull", "push"]}), 1000)131def plugin_unloaded():132    s = sublime.load_settings("Package Syncing.sublime-settings")133    s.clear_on_change("package_syncing")134    sublime.save_settings("Package Syncing.sublime-settings")135    # Stop folder watcher136    tools.stop_watcher()137if sublime.version()[0] == "2":...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
