Best Python code snippet using playwright-python
content.py
Source:content.py  
...69        ''' Filter a model in a batch model list. '''70        if not isinstance(m, ndb.Model):71            return False72        return True73    def _fulfill(self, key):74        ''' Fulfill a model, given a key, either from local cache or the datastore. '''75        key = self._filter_key(key)76        if key in self.seen:77            cached_model = {78                models.ContentNamespace: self.namespaces,79                models.ContentArea: self.areas,80                models.ContentSnippet: self.snippets,81                models.ContentSummary: self.summaries82            }.get(key.kind()).get(key)83            # perhaps it's already here?84            if cached_model is not None:85                # yay86                return cached_model87            # perhaps we're still waiting?88            elif key in futures:89                # block :(90                return self.futures.get(key).get_result()91            else:92                # wtf why was it seen then93                return key.get()94        else:95            # just get it96            return key.get()97    @tasklet98    def _batch_fulfill(self, batch, query=None, **kwargs):99        ''' Synchronously retrieve a batch of keys. '''100        # if it's a future, we're probably coming from a now-finished query101        if isinstance(batch, ndb.Future):102            batch = batch.get_result()103        # if it's not a list make it a list (obvs cuz this is _batch_ fulfill)104        if not isinstance(batch, list):105            batch = [batch]106        # filter out so we only get keys107        keys = map(self._filter_key, batch)108        # check for the hashed keygroup109        keygroup_s = tuple([s for s in keys])110        heuristic = self.heuristics.get(keygroup_s)111        if heuristic is not None:112            return [m for b, m in self.keygroups.get(heuristic)]113        return ndb.get_multi(keys, **kwargs)114    @tasklet115    def _batch_fulfill_async(self, batch, query=None, resultset_callback=None, item_callback=None, **kwargs):116        ''' Asynchronously retrieve a batch of keys. '''117        # if it's a future, we're probably coming from a now-finished query118        if isinstance(batch, ndb.Future):119            batch = batch.get_result()120        # if it's not a list make it a list (obvs cuz this is _batch_ fulfill async)121        if not isinstance(batch, list):122            batch = [batch]123        # build fetch!124        keys = map(self._filter_key, batch)125        # check for the hashed keygroup126        keygroup_s = tuple([s for s in keys])127        heuristic = self.heuristics.get(keygroup_s)128        if heuristic is not None:129            if resultset_callback:130                resultset_callback([m for b, m in self.keygroups.get(heuristic)], query=query)131            elif item_callback:132                map(lambda x: item_callback(x, query=query), [m for b, m in self.keygroups.get(heuristic)])133        else:134            # build operations135            op = ndb.get_multi_async(batch, **kwargs)136            # if we want results in batch137            if resultset_callback and not item_callback:138                # build a dependent multifuture that waits on all to finish139                multifuture = ndb.MultiFuture()140                # add each operation as a dependent141                for f in op:142                    multifuture.add_dependent(f)143                # close dependents144                multifuture.complete()145                # add callback to catch results in batch146                if query:147                    multifuture.add_callback(resultset_callback, multifuture, query=query)148                else:149                    multifuture.add_callback(resultset_callback, multifuture)150            # if we want results one by one151            elif item_callback and not resultset_callback:152                # add the item callback to each future153                for f in op:154                    f.add_callback(item_callback, f, query=query)155            # parallel yield!156            yield tuple(op)157    @tasklet158    def _run_query(self, query, count=False, **kwargs):159        ''' Synchronously run a query. '''160        # if we should count first, count161        if not isinstance(count, int) and count == True:162            count = query.count()163        # pull via count or via kwargs164        if isinstance(count, int):165            return query.fetch(count, **kwargs)166        return query.fetch(**kwargs)167    @tasklet168    def _run_query_async(self, query, count=False, future=None, resultset_callback=None, item_callback=None, eventual_callback=None, **kwargs):169        ''' Asynchronously run a query. '''170        # if we just counted171        if future:172            # we have a limit173            count = future.get_result()174            # there are no results175            if count == 0:176                yield resultset_callback([], query=query)177        # if we should count first, kick off a count and call self with the result178        if not isinstance(count, int) and count is True:179            future = query.count_async(**kwargs)180            future.add_callback(self._run_query_async, query, False, future, resultset_callback, item_callback, **kwargs)181            yield future182        else:183            # if we want to go over things in batch184            if resultset_callback and not item_callback:185                if isinstance(count, int):186                    future = query.fetch_async(count, **kwargs)187                else:188                    future = query.fetch_async(**kwargs)189                if eventual_callback:190                    future.add_callback(resultset_callback, future, query=query, resultset_callback=eventual_callback)191                else:192                    future.add_callback(resultset_callback, future, query=query)193            # if we want to go over things one at a time194            elif item_callback and not resultset_callback:195                if isinstance(count, int):196                    future = query.map_async(item_callback, limit=count, **kwargs)197                else:198                    future = query.map_async(item_callback, **kwargs)199            yield future200    @tasklet201    def _batch_store_callback(self, result, query=None):202        ''' Callback for asynchronously retrieved results. '''203        # landing from a query204        if isinstance(result, ndb.Future):205            result = result.get_result()206        # should really be a list207        if not isinstance(result, list):208            result = [result]209        _batch = []210        timestamp = int(time.time())211        for i in filter(self._filter_model, result):212            if i.key not in self.seen:213                self.seen.append(i.key)214            _batch.append((i.key, i))215            if isinstance(i, models.ContentNamespace):216                self.namespaces.get(i.key, {}).update({'model': i, 'timestamp': timestamp})217            elif isinstance(i, models.ContentArea):218                self.areas.get(i.key, {}).update({'model': i, 'timestamp': timestamp})219            elif isinstance(i, models.ContentSnippet):220                self.snippets.get(i.key, {}).update({'model': i, 'timestamp': timestamp})221            elif isinstance(i, models.ContentSummary):222                self.summaries.get(i.key, {}).update({'model': i, 'timestamp': timestamp})223        # generate the keygroup's signature and use it to look for an existing heuristics hint224        keygroup_s = tuple([b for b, i in _batch])225        keygroup_i = self.heuristics.get(keygroup_s, False)226        # trim the old one227        if keygroup_i:228            self.keygroups.remove(keygroup_i)229        # add the new keygroup and map it230        self.keygroups.append(_batch)231        self.heuristics[keygroup_s] = self.keygroups.index(_batch)232    def _build_keysonly_query(self, kind, parent=None, **kwargs):233        ''' Build a keys-only ndb.Query object with the _keys_only options object. '''234        return kind.query(ancestor=parent, default_options=_keys_only_opt.merge(ndb.QueryOptions(**kwargs)))235    def _build_projection_query(self, kind, properties, parent=None, **kwargs):236        ''' Build a projection ndb.Query object with the default _projection_opt options object. '''237        return kind.query(ancestor=parent, default_options=_projection_opt.merge(ndb.QueryOptions(projection=properties, **kwargs)))238    def _build_keygroup(self, keyname, namespace, snippet=False, summary=False):239        ''' Build a group of keys, always containing a ContentNamespace and ContentArea, optionally with a Snippet and Summary. '''240        keygroup = []241        # calculate namespace key242        namespace_key = self._build_namespace_key(namespace)243        keygroup.append(namespace)244        # calculate area key245        area_key = self._build_area_key(keyname, namespace)246        keygroup.append(area_key)247        # if a snippet key is requested248        if snippet:249            # you can pass a version into snippet manually, or True250            if isinstance(snippet, int):251                snippet_key = self._build_snippet_key(keyname, namespace, area_key, snippet)252            else:253                snippet_key = self._build_snippet_key(keyname, namespace, area_key)254            keygroup.append(snippet_key)255            # if a summary is requested256            if summary:257                # you can pass a version into summary manually, or True258                if isinstance(summary, int):259                    summary_key = self._build_summary_key(keyname, namespace, snippet_key, summary)260                else:261                    summary_key = self._build_summary_key(keyname, namespace, snippet_key)262                keygroup.append(snippet_key)263        return keygroup264    def _build_namespace_key(self, namespace):265        ''' Build a key for a ContentNamespace '''266        # if it's not the system namespace, it's key-worthy267        if namespace != _SYSTEM_NAMESPACE:268            if not isinstance(namespace, ndb.Key):269                try:270                    namespace = ndb.Key(models.ContentNamespace, ndb.Key(urlsafe=namespace).urlsafe())271                except Exception, e:272                    namespace = ndb.Key(models.ContentNamespace, hashlib.sha256(str(namespace)).hexdigest())273        else:274            namespace = ndb.Key(models.ContentNamespace, _SYSTEM_NAMESPACE)275        return namespace276    def _build_area_key(self, keyname, namespace):277        ''' Build a key for a ContentArea '''278        if not isinstance(namespace, ndb.Key):279            namespace = self._build_namespace_key(namespace)280        if (not isinstance(keyname, basestring)) or (not isinstance(namespace, ndb.Key)):281            raise ValueError("Must pass in a string keyname and ContentNamespace-compatible Key or string namespace.")282        else:283            return ndb.Key(models.ContentArea, hashlib.sha256(str(keyname)).hexdigest(), parent=namespace)284    def _build_snippet_key(self, keyname=None, namespace=None, area=None, version=1):285        ''' Build a key for a ContentSnippet '''286        if area is not None:287            return ndb.Key(models.ContentSnippet, str(version), parent=area)288        else:289            return ndb.Key(models.ContentSnippet, str(version), parent=self._build_area_key(keyname, namespace))290    def _build_summary_key(self, keyname, namespace, snippet=None, version=1):291        ''' Build a key for a ContentSummary '''292        if snippet is not None:293            return ndb.Key(models.ContentSummary, str(version), parent=snippet)294        else:295            return ndb.Key(models.ContentSummary, str(version), parent=self._build_snippet_key(keyname, namespace))296    def _find_callblocks(self, node):297        ''' Find callblocks in a node. '''298        ## @TODO: Implement AST walking.299        raise NotImplemented300    def _walk_jinja2_ast(self, ast):301        ''' Discover the power, of the haterade '''302        ## @TODO: Implement AST walking.303        raise NotImplemented304    def _pre_ast_hook(self, template_ast):305        ''' Pre-hook that is called right after the template AST is ready '''306        ## @TODO: Preload content with a pleasant walk down the AST.307        return template_ast308    def _compiled_ast_hook(self, code):309        ''' Post-hook that is called after the AST is compiled into a <code> object '''310        ## @TODO: Pick up the hints generated by pre_ast_hook.311        return code312    ## == Mid-level Methods == ##313    def _fulfill_content_area(self, keyname, namespace, caller):314        ''' Return a contentarea's content, or the caller's default, at render time. '''315        area = self._fulfill(self._build_area_key(keyname, namespace))316        if area is None:317            return caller()318        if area.local:319            return (area.html, area.text)320        else:321            snippet = self._fulfill(area.latest)322            if snippet is None:323                return caller()324            return (snippet.html, snippet.text)325    def _fulfill_content_snippet(self, keyname, namespace, caller):326        ''' Return a contentsnippet's content, or the caller's default, at render time. '''327        area = self._fulfill(self._build_area_key(keyname, namespace))328        if area is None:329            return caller()330        snippet = self._fulfill(area.latest)331        if snippet is None:332            return caller()333        return (snippet.html, snippet.text)334    def _fulfill_content_summary(self, keyname, namespace, caller):335        ''' Return a contentsummary's content, or the caller's default, at render time. '''336        area = self._fulfill(self._build_area_key(keyname, namespace))337        if area is None:338            return caller()339        snippet = self._fulfill(area.latest)340        if snippet is None:341            return caller()342        if snippet.summary is None:343            return caller()344        summary = self._fulfill(snippet.summary)345        if summary is None:346            return caller()347        return (summary.html, summary.text)348    def _load_template(self, _environment, name, direct=False):349        ''' Load a template from source/compiled packages and preprocess. '''350        ## check the internal API bytecode cache first351        if name not in self.templates:352            # get loader + bytecacher353            loader = _environment.loader354            bytecache = _environment.bytecode_cache355            try:356                # for modules: load directly (it's way faster)357                if not loader.has_source_access:358                    template = self._pre_ast_hook(loader.load(_environment, name, prepare=False))...app_state_api.py
Source:app_state_api.py  
...185    def is_fulfilled(application):186        """Returns True if the given application step has been fulfilled"""187        raise NotImplementedError188    @staticmethod189    def _fulfill(application, **kwargs):190        """Performs the necessary data manipulation to fulfill this step of the application"""191        raise NotImplementedError192    @staticmethod193    def _revert(application):194        """195        Performs the necessary data manipulation to ensure that this step of the application has not been fulfilled196        """197        raise NotImplementedError198    @classmethod199    def fulfill(cls, application, **kwargs):200        """201        Performs the necessary data manipulation to fulfill this step of the application, and ensures that the202        application is in the correct state afterwards203        """204        cls._fulfill(application, **kwargs)205        # NOTE: These functions perform some data manipulation on an application that aren't supported by normal206        # functionality, hence the manual setting of the state instead of using state transitions.207        application.refresh_from_db()208        state_idx = ORDERED_UNFINISHED_APP_STATES.index(cls.state)209        new_state = (210            AppStates.COMPLETE.value211            if state_idx == len(ORDERED_UNFINISHED_APP_STATES) - 1212            else ORDERED_UNFINISHED_APP_STATES[state_idx + 1]213        )214        application.state = new_state215        application.save()216    @classmethod217    def revert(cls, application):218        """219        Performs the necessary data manipulation to ensure that this step of the application has not been fulfilled,220        and ensures that the application is in the correct state afterwards221        """222        cls._revert(application)223        # NOTE: These functions perform some data manipulation on an application that aren't supported by normal224        # functionality, hence the manual setting of the state instead of using state transitions.225        application.refresh_from_db()226        application.state = cls.state227        application.save()228class AwaitingProfileStep(AppStep):229    """Provides functionality for fulfilling or reverting the 'awaiting profile' step of an application"""230    state = AppStates.AWAITING_PROFILE_COMPLETION.value231    @staticmethod232    def is_fulfilled(application):233        return is_user_info_complete(application.user)234    @staticmethod235    def _fulfill(application, **kwargs):236        fill_out_registration_info(application.user)237    @staticmethod238    def _revert(application):239        LegalAddress.objects.filter(user=application.user).delete()240class AwaitingResumeStep(AppStep):241    """Provides functionality for fulfilling or reverting the 'awaiting resume' step of an application"""242    state = AppStates.AWAITING_RESUME.value243    @staticmethod244    def is_fulfilled(application):245        return application.resume_upload_date is not None and (246            application.resume_file is not None or application.linkedin_url is not None247        )248    @staticmethod249    def _fulfill(application, **kwargs):250        with open(251            os.path.join(settings.BASE_DIR, DUMMY_RESUME_FILEPATH), "rb"252        ) as resume_file:253            application.add_resume(254                resume_file=File(resume_file, name=DUMMY_RESUME_FILENAME),255                linkedin_url=DUMMY_LINKEDIN_URL,256            )257        application.save()258    @staticmethod259    def _revert(application):260        if application.resume_file is not None:261            application.resume_file.delete()262        application.resume_file = None263        application.linkedin_url = None264        application.resume_upload_date = None265        application.save()266class AwaitingSubmissionsStep(AppStep):267    """Provides functionality for fulfilling or reverting the 'awaiting submissions' step of an application"""268    state = AppStates.AWAITING_USER_SUBMISSIONS.value269    @staticmethod270    def is_fulfilled(application):271        submissions = list(application.submissions.all())272        submission_review_statuses = [273            submission.review_status for submission in submissions274        ]275        if any(276            [status == REVIEW_STATUS_REJECTED for status in submission_review_statuses]277        ):278            return True279        elif any(280            [status == REVIEW_STATUS_PENDING for status in submission_review_statuses]281        ):282            return True283        elif len(submissions) < application.bootcamp_run.application_steps.count():284            return False285    @staticmethod286    def _fulfill(application, **kwargs):287        num_to_fulfill = kwargs.get("num_submissions", None)288        run_steps = application.bootcamp_run.application_steps.order_by(289            "application_step__step_order"290        ).all()291        num_to_fulfill = num_to_fulfill or len(run_steps)292        if num_to_fulfill and num_to_fulfill > len(run_steps):293            raise ValidationError(294                "{} step(s) exist. Cannot fulfill {}.".format(295                    len(run_steps), num_to_fulfill296                )297            )298        for i, run_step in enumerate(run_steps):299            if i >= num_to_fulfill:300                break301            submission_factory = SUBMISSION_FACTORIES[302                run_step.application_step.submission_type303            ]304            submission_factory(application, run_step)305    @staticmethod306    def _revert(application):307        application.submissions.all().delete()308class AwaitingReviewStep(AppStep):309    """Provides functionality for fulfilling or reverting the 'awaiting submission review' step of an application"""310    state = AppStates.AWAITING_SUBMISSION_REVIEW.value311    @staticmethod312    def is_fulfilled(application):313        submissions = list(application.submissions.all())314        submission_review_statuses = [315            submission.review_status for submission in submissions316        ]317        return len(submissions) > 0 and len(submissions) == len(318            [319                status320                for status in submission_review_statuses321                if status in SUBMISSION_REVIEW_COMPLETED_STATES322            ]323        )324    @staticmethod325    def _fulfill(application, **kwargs):326        num_to_fulfill = kwargs.get("num_reviews", None)327        submissions = list(328            application.submissions.order_by(329                "run_application_step__application_step__step_order"330            ).all()331        )332        num_to_fulfill = num_to_fulfill or len(submissions)333        if num_to_fulfill and num_to_fulfill > len(submissions):334            raise ValidationError(335                "{} submission(s) exist. Cannot fulfill {}.".format(336                    len(submissions), num_to_fulfill337                )338            )339        now = now_in_utc()340        for i, submission in enumerate(submissions):341            if i >= num_to_fulfill:342                break343            submission.review_status = REVIEW_STATUS_APPROVED344            submission.review_status_date = now345            submission.save()346    @staticmethod347    def _revert(application):348        application.submissions.update(349            review_status=REVIEW_STATUS_PENDING, review_status_date=None350        )351class AwaitingPaymentStep(AppStep):352    """Provides functionality for fulfilling or reverting the 'awaiting payment' step of an application"""353    state = AppStates.AWAITING_PAYMENT.value354    @staticmethod355    def is_fulfilled(application):356        return application.is_paid_in_full357    @staticmethod358    def _fulfill(application, **kwargs):359        run = application.bootcamp_run360        total_run_price = run.price361        order, _ = Order.objects.update_or_create(362            user=application.user,363            application=application,364            defaults=dict(status=Order.FULFILLED, total_price_paid=total_run_price),365        )366        Line.objects.update_or_create(367            order=order, bootcamp_run=run, defaults=dict(price=total_run_price)368        )369        complete_successful_order(order, send_receipt=False)370    @staticmethod371    def _revert(application):372        Order.objects.filter(application=application).delete()...__init__.py
Source:__init__.py  
...32        self.reason = None33        self._callbacks = []34        self._errbacks = []35        self.name = name36    def _fulfill(self, value):37        """38        Fulfill the promise with a given value.39        """40        assert self._state == self.PENDING, "Promise state is not pending"41        self._state = self.FULFILLED42        self.value = value43        for callback in self._callbacks:44            try:45                callback(value)46            except Exception:47                # Ignore errors in callbacks48                self.logger.exception("Error in callback %s", callback)49        # We will never call these callbacks again, so allow50        # them to be garbage collected.  This is important since51        # they probably include closures which are binding variables52        # that might otherwise be garbage collected.53        self._callbacks = []54        self._errbacks = []55    def fulfill(self, value):56        self._fulfill(value)57        return self58    def _reject(self, reason, bubbling=False):59        """60        Reject this promise for a given reason.61        """62        assert self._state == self.PENDING, "Promise state is not pending"63        if not bubbling and log_error(reason):64            exc_info = sys.exc_info()65            self.logger.debug("Promise (%s) rejected: %s", self.name, reason,66                              exc_info=exc_info[0] and exc_info or None)67            self.logger.debug(self._errbacks)68            if self.logger.isEnabledFor(DEBUG):69                print_stack()70        else:71            pass72        self._state = self.REJECTED73        self.reason = reason74        for errback in self._errbacks:75            try:76                errback(reason)77            except Exception:78                self.logger.exception("Error in errback %s", errback)79                # Ignore errors in callbacks80        # We will never call these errbacks again, so allow81        # them to be garbage collected.  This is important since82        # they probably include closures which are binding variables83        # that might otherwise be garbage collected.84        self._errbacks = []85        self._callbacks = []86    def reject(self, reason):87        self._reject(reason)88        return self89    def isPending(self):90        """Indicate whether the Promise is still pending."""91        return self._state == self.PENDING92    def isFulfilled(self):93        """Indicate whether the Promise has been fulfilled."""94        return self._state == self.FULFILLED95    def isRejected(self):96        """Indicate whether the Promise has been rejected."""97        return self._state == self.REJECTED98    def get(self, timeout=None):99        """Get the value of the promise, waiting if necessary."""100        self.wait(timeout)101        if self._state == self.FULFILLED:102            return self.value103        raise self.reason104    def wait(self, timeout=None):105        """106        An implementation of the wait method which doesn't involve107        polling but instead utilizes a "real" synchronization108        scheme.109        """110        import threading111        if self._state != self.PENDING:112            return113        e = threading.Event()114        self.addCallback(lambda v: e.set())115        self.addErrback(lambda r: e.set())116        e.wait(timeout)117    def addCallback(self, f):118        """119        Add a callback for when this promise is fulfilled.  Note that120        if you intend to use the value of the promise somehow in121        the callback, it is more convenient to use the 'then' method.122        """123        self._callbacks.append(f)124    def addErrback(self, f):125        """126        Add a callback for when this promise is rejected.  Note that127        if you intend to use the rejection reason of the promise128        somehow in the callback, it is more convenient to use129        the 'then' method.130        """131        self._errbacks.append(f)132    if sys.subversion[0] != "CPython":133        def _invoke(self, func, value):134            try:135                if value is None:136                    args, _, _, _ = getargspec(func)137                    arglen = len(args)138                    if not arglen or (arglen == 1 and ismethod(func)):139                        return func()140                return func(value)141            except Exception as e:142                if log_error(e):143                    self.logger.exception("Error in handler %s", func)144                else:145                    self.logger.debug("Error in handler %s: %s", func, e)146                raise147    else:148        def _invoke(self, func, value):149            try:150                if value is None:151                    try:152                        target = func.im_func153                    except AttributeError:154                        argcount = func.func_code.co_argcount155                    else:156                        argcount = target.func_code.co_argcount - 1157                    if argcount == 0:158                        return func()159                return func(value)160            except Exception as e:161                if log_error(e):162                    self.logger.exception("Error in handler %s", func)163                else:164                    self.logger.debug("Error in handler %s: %s", func, repr(e))165                raise166    def __enter__(self):167        return self168    def __exit__(self, exc_type, exc_value, exc_tb):169        if self.isPending():170            if exc_value is not None:171                if log_error(exc_value):172                    self.logger.exception("Promise automatically rejected")173                self._reject(exc_value, bubbling=True)174                return True175            else:176                self.fulfill(None)177    def then(self, success=None, failure=None, name=None):178        """179        This method takes two optional arguments.  The first argument180        is used if the "self promise" is fulfilled and the other is181        used if the "self promise" is rejected.  In either case, this182        method returns another promise that effectively represents183        the result of either the first of the second argument (in the184        case that the "self promise" is fulfilled or rejected,185        respectively).186        Each argument can be either:187          * None - Meaning no action is taken188          * A function - which will be called with either the value189            of the "self promise" or the reason for rejection of190            the "self promise".  The function may return:191            * A value - which will be used to fulfill the promise192              returned by this method.193            * A promise - which, when fulfilled or rejected, will194              cascade its value or reason to the promise returned195              by this method.196          * A value - which will be assigned as either the value197            or the reason for the promise returned by this method198            when the "self promise" is either fulfilled or rejected,199            respectively.200        """201        if name is None:202            try:203                name = success.__name__204            except AttributeError:205                name = str(success)206        ret = Promise(name=name)207        state = self._state208        if state == self.PENDING:209            """210            If this is still pending, then add callbacks to the211            existing promise that call either the success or212            rejected functions supplied and then fulfill the213            promise being returned by this method214            """215            def callAndFulfill(v):216                """217                A callback to be invoked if the "self promise"218                is fulfilled.219                """220                try:221                    # From 3.2.1, don't call non-functions values222                    if callable(success):223                        newvalue = self._invoke(success, v)224                        if _isPromise(newvalue):225                            newvalue.then(ret._fulfill,226                                          ret._reject)227                        else:228                            ret._fulfill(newvalue)229                    else:230                        # From 3.2.6.4231                        ret._fulfill(v)232                except Exception as e:233                    ret._reject(e)234            def callAndReject(r):235                """236                A callback to be invoked if the "self promise"237                is rejected.238                """239                try:240                    if callable(failure):241                        newvalue = failure(r)242                        if _isPromise(newvalue):243                            newvalue.then(ret._fulfill,244                                          ret._reject)245                        else:246                            ret._fulfill(newvalue)247                    else:248                        # From 3.2.6.5249                        ret._reject(r)250                except Exception as e:251                    ret._reject(e)252            self._callbacks.append(callAndFulfill)253            self._errbacks.append(callAndReject)254        elif state == self.FULFILLED:255            # If this promise was already fulfilled, then256            # we need to use the first argument to this method257            # to determine the value to use in fulfilling the258            # promise that we return from this method.259            try:260                if callable(success):261                    newvalue = self._invoke(success, self.value)262                    if _isPromise(newvalue):263                        newvalue.then(ret._fulfill,264                                      lambda r: ret._reject(r, bubbling=True))265                    else:266                        ret._fulfill(newvalue)267                else:268                    # From 3.2.6.4269                    ret._fulfill(self.value)270            except Exception as e:271                ret._reject(e)272        else:273            # If this promise was already rejected, then274            # we need to use the second argument to this method275            # to determine the value to use in fulfilling the276            # promise that we return from this method.277            try:278                if callable(failure):279                    newvalue = self._invoke(failure, self.reason)280                    if _isPromise(newvalue):281                        newvalue.then(ret._fulfill,282                                      ret._reject)283                    else:284                        ret._fulfill(newvalue)285                else:286                    # From 3.2.6.5287                    ret._reject(self.reason, bubbling=True)288            except Exception as e:289                ret._reject(e)290        return ret291def _isPromise(obj):292    """293    A utility function to determine if the specified294    object is a promise using "duck typing".295    """296    if isinstance(obj, Promise):297        return True298    try:299        return callable(obj.fulfill) and callable(obj.reject) and\300            callable(obj.then)301    except AttributeError:302        return False303def listPromise(*args):304    """305    A special function that takes a bunch of promises306    and turns them into a promise for a vector of values.307    In other words, this turns an list of promises for values308    into a promise for a list of values.309    """310    ret = Promise()311    def handleSuccess(v, ret):312        for arg in args:313            if not arg.isFulfilled():314                return315        value = map(lambda p: p.value, args)316        ret._fulfill(value)317    for arg in args:318        arg.addCallback(lambda v: handleSuccess(v, ret))319        arg.addErrback(lambda r: ret.reject(r))320    # Check to see if all the promises are already fulfilled321    handleSuccess(None, ret)322    return ret323def dictPromise(m):324    """325    A special function that takes a dictionary of promises326    and turns them into a promise for a dictionary of values.327    In other words, this turns an dictionary of promises for values328    into a promise for a dictionary of values.329    """330    ret = Promise()...test_promise.py
Source:test_promise.py  
...29    pro = TDOPromise(chain, 0, 1)30    left, right = pro.split(1)31    assert left == pro32    assert right is None33def test_promise_fulfill():34    pro = TDOPromise(chain, 0, 5)35    pro._fulfill(bitarray('10101'))36    assert pro() == bitarray('10101')37    pro = TDOPromise(chain, 0, 5)38    pro._fulfill(bitarray('01010'))39    assert pro() == bitarray('01010')40def test_promise_split_fulfill():41    correct = bitarray('0011011111001')42    for i in range(len(correct)):43        print("Iteration", i)44        pro = TDOPromise(chain, 0, len(correct))45        cl, cr = correct[:i], correct[i:]46        l,r = pro.split(i)47        print(pro._components)48        print(l)49        print(r)50        if l:51            print("Load L", cl)52            l._fulfill(cl)53        if r:54            print("Load R", cr)55            r._fulfill(cr)56        assert pro() == correct57        print()58def test_composite_arbitrary_double_split_left():59    correct = bitarray('0011011111001')60    print("CORRECT", correct.to01())61    for i in range(1,len(correct)-1):62        cl, cr = correct[:i], correct[i:]63        for j in range(1, len(correct)-i):64            pro = TDOPromise(chain, 0, len(correct))65            l,r = pro.split(i)66            r1, r2 = r.split(j)67            cr1, cr2 = cr[:j], cr[j:]68            print("CORRECT", cl.to01(), cr1.to01(), cr2.to01())69            print("Processing", i, j)70            l._fulfill(cl)71            r1._fulfill(cr1)72            r2._fulfill(cr2)73            assert pro() == correct74def test_promise_double_split_right_fulfill():75    correct = bitarray('0011011111001')76    print("CORRECT", correct.to01())77    for i in range(len(correct)-1, 1, -1):78        for j in range(i-1, 0, -1):79            pro = TDOPromise(chain, 0, len(correct))80            cl, cr = correct[:i], correct[i:]81            cl1, cl2 = cl[:j], cl[j:]82            l,r = pro.split(i)83            l1, l2 = l.split(j)84            l1._fulfill(cl1)85            l2._fulfill(cl2)86            r._fulfill(cr)87            assert pro() == correct88def test_promisecollection_creation():89    pro = TDOPromise(chain, 0, 8)90    promises = TDOPromiseCollection(chain)91    assert not bool(promises)92    promises.add(pro, 0)93    assert bool(promises)94def test_promisecollection_make_offset_sub():95    pro0 = TDOPromise(chain, 0, 8)96    pro1 = TDOPromise(chain, 0, 8)97    promises = TDOPromiseCollection(chain)98    promises.add(pro0, 0)99    promises.add(pro1, 8)100    offsetpromises = promises.makesubatoffset(5)101    offsetpromises._fulfill(bitarray('000001111111011001010'))102    assert pro0() == bitarray('11111110')103    assert pro1() == bitarray('11001010')104def test_promisecollection_make_offset_sub_with_ideal_tdo():105    pro0 = TDOPromise(chain, 0, 8)106    pro1 = TDOPromise(chain, 0, 8)107    promises = TDOPromiseCollection(chain)108    promises.add(pro0, 0)109    promises.add(pro1, 16, _offsetideal=8)110    offsetpromises = promises.makesubatoffset(5, _offsetideal=0)111    print(offsetpromises)112    offsetpromises._fulfill(bitarray('1111111011001010'),113                            ignore_nonpromised_bits=True)114    assert pro0() == bitarray('11111110')115    assert pro1() == bitarray('11001010')116def test_promisecollection_fulfill():117    pro = TDOPromise(chain, 0, 8)118    promises = TDOPromiseCollection(chain)119    promises.add(pro, 0)120    promises._fulfill(bitarray('11001010'))121    assert pro() == bitarray('11001010')122    pro = TDOPromise(chain, 0, 10)123    promises = TDOPromiseCollection(chain)124    promises.add(pro, 2)125    promises._fulfill(bitarray('0011001010'))126    assert pro() == bitarray('11001010')127def test_promisecollection_nogaps_split_fulfill():128    correct = bitarray('1111111011001010')129    #DOES NOT AUTOSCALE TO LARGER DATA130    for i in range(len(correct)):131        print("Iteration", i)132        pro0 = TDOPromise(chain, 0, 8)133        pro1 = TDOPromise(chain, 0, 8)134        promises = TDOPromiseCollection(chain)135        promises.add(pro0, 0)136        promises.add(pro1, 8)137        cl, cr = correct[:i], correct[i:]138        l,r = promises.split(i)139        print("promises", promises._promises)140        print("L", l)141        print("R", r)142        if l:143            print("Load L", cl)144            l._fulfill(cl)145        if r:146            print("Load R", cr)147            r._fulfill(cr)148        assert pro0() == correct[:8]149        assert pro1() == correct[8:]150        print()151def test_promisecollection_gap_split_fulfill():152    indat = bitarray('111111100011001010')153    correct = bitarray('1111111011001010')154    #DOES NOT AUTOSCALE TO LARGER DATA155    for i in range(len(indat)):156        print("Iteration", i)157        pro0 = TDOPromise(chain, 0, 8)158        pro1 = TDOPromise(chain, 0, 8)159        promises = TDOPromiseCollection(chain)160        promises.add(pro0, 0)161        promises.add(pro1, 10)162        cl, cr = indat[:i], indat[i:]163        l,r = promises.split(i)164        print("promises", promises._promises)165        print("L", l)166        print("R", r)167        if l:168            print("Load L", cl)169            l._fulfill(cl)170        if r:171            print("Load R", cr)172            r._fulfill(cr)173        assert pro0() == correct[:8]174        assert pro1() == correct[8:]175        print()176def test_promisecollection_double_split_left_fulfill():177    correct = bitarray('1111111011001010')178    print("CORRECT", correct.to01())179    for i in range(1,len(correct)-1):180        cl, cr = correct[:i], correct[i:]181        for j in range(1, len(correct)-i):182            pro0 = TDOPromise(chain, 0, 8)183            pro1 = TDOPromise(chain, 0, 8)184            promises = TDOPromiseCollection(chain)185            promises.add(pro0, 0)186            promises.add(pro1, 8)187            l,r = promises.split(i)188            r1, r2 = r.split(j)189            cr1, cr2 = cr[:j], cr[j:]190            l._fulfill(cl)191            r1._fulfill(cr1)192            r2._fulfill(cr2)193            assert pro0() == correct[:8]194            assert pro1() == correct[8:]195def test_promisecollection_double_split_right_fulfill():196    correct = bitarray('1111111011001010')197    print("CORRECT", correct.to01())198    for i in range(len(correct)-1, 1, -1):199        for j in range(i-1, 0, -1):200            pro0 = TDOPromise(chain, 0, 8)201            pro1 = TDOPromise(chain, 0, 8)202            promises = TDOPromiseCollection(chain)203            promises.add(pro0, 0)204            promises.add(pro1, 8)205            cl, cr = correct[:i], correct[i:]206            cl1, cl2 = cl[:j], cl[j:]207            l,r = promises.split(i)208            l1, l2 = l.split(j)209            l1._fulfill(cl1)210            l2._fulfill(cl2)211            r._fulfill(cr)212            assert pro0() == correct[:8]...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
