Best Python code snippet using localstack_python
__init__.py
Source:__init__.py  
...40    get_library().set_max_log_level(level)41def invoke(name, argtypes, *args):42    """Perform a synchronous library function call."""43    get_library().invoke(name, argtypes, *args)44def invoke_async(name: str, argtypes, *args, return_type=None) -> asyncio.Future:45    """Perform an asynchronous library function call."""46    return get_library().invoke_async(name, argtypes, *args, return_type=return_type)47def generate_raw_key(seed: Union[str, bytes] = None) -> str:48    """Generate a new raw store wrapping key."""49    key = StrBuffer()50    invoke(51        "askar_store_generate_raw_key",52        (FfiByteBuffer, POINTER(StrBuffer)),53        seed,54        byref(key),55    )56    return str(key)57def version() -> str:58    """Get the version of the installed library."""59    return get_library().version()60async def store_open(61    uri: str, key_method: str = None, pass_key: str = None, profile: str = None62) -> StoreHandle:63    """Open an existing Store and return the open handle."""64    return await invoke_async(65        "askar_store_open",66        (FfiStr, FfiStr, FfiStr, FfiStr),67        uri,68        key_method and key_method.lower(),69        pass_key,70        profile,71        return_type=StoreHandle,72    )73async def store_provision(74    uri: str,75    key_method: str = None,76    pass_key: str = None,77    profile: str = None,78    recreate: bool = False,79) -> StoreHandle:80    """Provision a new Store and return the open handle."""81    return await invoke_async(82        "askar_store_provision",83        (FfiStr, FfiStr, FfiStr, FfiStr, c_int8),84        uri,85        key_method and key_method.lower(),86        pass_key,87        profile,88        recreate,89        return_type=StoreHandle,90    )91async def store_create_profile(handle: StoreHandle, name: str = None) -> str:92    """Create a new profile in a Store."""93    return str(94        await invoke_async(95            "askar_store_create_profile",96            (StoreHandle, FfiStr),97            handle,98            name,99            return_type=StrBuffer,100        )101    )102async def store_get_profile_name(handle: StoreHandle) -> str:103    """Get the name of the default Store instance profile."""104    return str(105        await invoke_async(106            "askar_store_get_profile_name",107            (StoreHandle,),108            handle,109            return_type=StrBuffer,110        )111    )112async def store_remove_profile(handle: StoreHandle, name: str) -> bool:113    """Remove an existing profile from a Store."""114    return (115        await invoke_async(116            "askar_store_remove_profile",117            (StoreHandle, FfiStr),118            handle,119            name,120            return_type=c_int8,121        )122        != 0123    )124async def store_rekey(125    handle: StoreHandle,126    key_method: str = None,127    pass_key: str = None,128) -> StoreHandle:129    """Replace the store key on a Store."""130    return await invoke_async(131        "askar_store_rekey",132        (StoreHandle, FfiStr, FfiStr),133        handle,134        key_method and key_method.lower(),135        pass_key,136        return_type=c_int8,137    )138async def store_remove(uri: str) -> bool:139    """Remove an existing Store, if any."""140    return (141        await invoke_async(142            "askar_store_remove",143            (FfiStr,),144            uri,145            return_type=c_int8,146        )147        != 0148    )149async def session_start(150    handle: StoreHandle, profile: Optional[str] = None, as_transaction: bool = False151) -> SessionHandle:152    """Start a new session with an open Store."""153    handle = await invoke_async(154        "askar_session_start",155        (StoreHandle, FfiStr, c_int8),156        handle,157        profile,158        as_transaction,159        return_type=SessionHandle,160    )161    return handle162async def session_count(163    handle: SessionHandle, category: str, tag_filter: Union[str, dict] = None164) -> int:165    """Count rows in the Store."""166    return int(167        await invoke_async(168            "askar_session_count",169            (SessionHandle, FfiStr, FfiJson),170            handle,171            category,172            tag_filter,173            return_type=c_int64,174        )175    )176async def session_fetch(177    handle: SessionHandle, category: str, name: str, for_update: bool = False178) -> EntryListHandle:179    """Fetch a row from the Store."""180    return await invoke_async(181        "askar_session_fetch",182        (SessionHandle, FfiStr, FfiStr, c_int8),183        handle,184        category,185        name,186        for_update,187        return_type=EntryListHandle,188    )189async def session_fetch_all(190    handle: SessionHandle,191    category: str,192    tag_filter: Union[str, dict] = None,193    limit: int = None,194    for_update: bool = False,195) -> EntryListHandle:196    """Fetch all matching rows in the Store."""197    return await invoke_async(198        "askar_session_fetch_all",199        (SessionHandle, FfiStr, FfiJson, c_int64, c_int8),200        handle,201        category,202        tag_filter,203        limit if limit is not None else -1,204        for_update,205        return_type=EntryListHandle,206    )207async def session_remove_all(208    handle: SessionHandle,209    category: str,210    tag_filter: Union[str, dict] = None,211) -> int:212    """Remove all matching rows in the Store."""213    return int(214        await invoke_async(215            "askar_session_remove_all",216            (SessionHandle, FfiStr, FfiJson),217            handle,218            category,219            tag_filter,220            return_type=c_int64,221        )222    )223async def session_update(224    handle: SessionHandle,225    operation: EntryOperation,226    category: str,227    name: str,228    value: Union[str, bytes] = None,229    tags: dict = None,230    expiry_ms: Optional[int] = None,231):232    """Update a Store by inserting, updating, or removing a record."""233    return await invoke_async(234        "askar_session_update",235        (SessionHandle, c_int8, FfiStr, FfiStr, FfiByteBuffer, FfiTagsJson, c_int64),236        handle,237        operation.value,238        category,239        name,240        value,241        tags,242        -1 if expiry_ms is None else expiry_ms,243    )244async def session_insert_key(245    handle: SessionHandle,246    key_handle: LocalKeyHandle,247    name: str,248    metadata: str = None,249    tags: dict = None,250    expiry_ms: Optional[int] = None,251):252    return await invoke_async(253        "askar_session_insert_key",254        (SessionHandle, LocalKeyHandle, FfiStr, FfiStr, FfiTagsJson, c_int64),255        handle,256        key_handle,257        name,258        metadata,259        tags,260        -1 if expiry_ms is None else expiry_ms,261    )262async def session_fetch_key(263    handle: SessionHandle, name: str, for_update: bool = False264) -> KeyEntryListHandle:265    return await invoke_async(266        "askar_session_fetch_key",267        (SessionHandle, FfiStr, c_int8),268        handle,269        name,270        for_update,271        return_type=KeyEntryListHandle,272    )273async def session_fetch_all_keys(274    handle: SessionHandle,275    alg: Union[str, KeyAlg] = None,276    thumbprint: str = None,277    tag_filter: Union[str, dict] = None,278    limit: int = None,279    for_update: bool = False,280) -> KeyEntryListHandle:281    """Fetch all matching keys in the Store."""282    if isinstance(alg, KeyAlg):283        alg = alg.value284    return await invoke_async(285        "askar_session_fetch_all_keys",286        (SessionHandle, FfiStr, FfiStr, FfiJson, c_int64, c_int8),287        handle,288        alg,289        thumbprint,290        tag_filter,291        limit if limit is not None else -1,292        for_update,293        return_type=KeyEntryListHandle,294    )295async def session_update_key(296    handle: SessionHandle,297    name: str,298    metadata: str = None,299    tags: dict = None,300    expiry_ms: Optional[int] = None,301):302    await invoke_async(303        "askar_session_update_key",304        (SessionHandle, FfiStr, FfiStr, FfiTagsJson, c_int64),305        handle,306        name,307        metadata,308        tags,309        -1 if expiry_ms is None else expiry_ms,310    )311async def session_remove_key(handle: SessionHandle, name: str):312    await invoke_async(313        "askar_session_remove_key",314        (SessionHandle, FfiStr),315        handle,316        name,317    )318async def scan_start(319    handle: StoreHandle,320    profile: Optional[str],321    category: str,322    tag_filter: Union[str, dict] = None,323    offset: int = None,324    limit: int = None,325) -> ScanHandle:326    """Create a new Scan against the Store."""327    return await invoke_async(328        "askar_scan_start",329        (StoreHandle, FfiStr, FfiStr, FfiJson, c_int64, c_int64),330        handle,331        profile,332        category,333        tag_filter,334        offset or 0,335        limit if limit is not None else -1,336        return_type=ScanHandle,337    )338async def scan_next(handle: ScanHandle) -> EntryListHandle:339    return await invoke_async(340        "askar_scan_next", (ScanHandle,), handle, return_type=EntryListHandle341    )342def entry_list_count(handle: EntryListHandle) -> int:343    len = c_int32()344    invoke(345        "askar_entry_list_count",346        (EntryListHandle, POINTER(c_int32)),347        handle,348        byref(len),349    )350    return len.value351def key_entry_list_count(handle: KeyEntryListHandle) -> int:352    len = c_int32()353    invoke(...pubsub.py
Source:pubsub.py  
...19        params = json.loads(params)20    for key in params:21        option[key] = params[key]22    return option23def invoke_async(coro):24    return asyncio.get_event_loop().run_until_complete(coro)25class Pubsub(object):26    """27    PSDCNv3 Pubsub client.28    Can be used for both publishers and subscribers.29    30    :ivar face: a face (a connection to a forwarder, known as `app` in python-ndn)31    :ivar pub_prefix (optional): prefix of the publisher for data exchange with the broker32    """33    def __init__(self, face, svc_name=None, pub_prefix="/pub1_prefix"):34        self.face = self.app = face35        self.keeper = PSKCmd(svc_name)36        self.pub_prefix = pub_prefix37    def pubadv(self, dataname, onSuccess, onFailure, params={}):38        """39        Pubsub function for issuing publish-advertise requests.40        :param dataname: data name which publications will be made to.41        :type dataname: str42        :param onSuccess:43            callback function of dataname which is called when the request was successful44        :type onSuccess: function of (str) -> None45        :param onFailure:46            callback function of dataname and a reason (message) of error47                which is called when the request was unsuccessful.48        :type onFailure: function of (str, str) -> None49        :param params: dict of keys from 'topicscope', 'storagetype', 'storageprefix' and 'redefine'50            where 'topicscope' is scope of the topic (TopicScope.GLOBAL/default, TopicScope.LOCAL)51                  'storeagetype' is the type of storage for the data name52                      (StorageType.BROKER/default, StorageType.PUBLISHER, StorageType.DIFS)53                  'storageprefix' is the prefix for the publisher storage or DIFS storage.54                      (Must be provided with `storagetype` being StorageType.PUBLISHER or55                       StorageType.DIFS.)56                  'redefine' is a flag to inform if redefinition is allowed.(False)57        :type params: dict or a JSON string58        :return: True(succes) or False(failure)59        :rtype: bool60        """61        # Preamble62        dataname = normalize(dataname)63        pubadvinfo = populate(PubAdvInfo(dataname), params)64        # Make pubadv command65        if pubadvinfo['storageprefix']:66            if pubadvinfo['storagetype'] == StorageType.BROKER:67                onFailure(dataname,68                    f"Storagetype not PUBLISHER or DIFS given {pubadvinfo['storageprefix']}")69                return False70        command, int_param, app_param = self.keeper.make_pubadv_cmd(71            self.keeper.svc_name, dataname, pubadvinfo=pubadvinfo)72        # Fire the command73        try:74            int_name, meta, content = invoke_async(75                self.face.express_interest(76                    Name.from_str(command), interest_param=int_param, app_param=app_param77                )78            )79            content = json.loads(bytes(content).decode())80            if content['status'] == 'OK':81                onSuccess(dataname)82                return True83            else:84                reason = content['reason'] if 'reason' in content else "Unknown pubadv error"85                onFailure(dataname, reason)86                return False87        except Exception as e:88            onFailure(dataname, f"{type(e).__name__}")89            return False90    def pubunadv(self, dataname, onSuccess, onFailure, params={}):91        """92        Pubsub function for issuing publish-unadvertise requests.93        :param dataname: data name which publications will be made to.94        :type dataname: str95        :param onSuccess:96            callback function of dataname which is called when the request was successful97        :type onSuccess: function of (str) -> None98        :param onFailure:99            callback function of dataname and a reason (message) of error100                which is called when the request was unsuccessful.101        :type onFailure: function of (str, str) -> None102        :param params: dict of keys from 'topicscope', and 'allow_undefined'103            where 'topicscope' is the scope of topic (TopicScope.GLOBAL/default, TopicScope.LOCAL)104                  'allow_undefined' is a flag to inform 105                      if unadvertising an undefined name is allowed. (True)106        :type params: dict or JSON string107        :return: True(succes) or False(failure)108        :rtype: bool109        """110        # Preamble111        dataname = normalize(dataname)112        pubadvinfo = populate(PubAdvInfo(dataname), params)113        # Make pubunadv command114        command, int_param, app_param = self.keeper.make_pubunadv_cmd(115            self.keeper.svc_name, dataname, pubadvinfo=pubadvinfo)116        # Fire the command117        try:118            int_name, meta, content = invoke_async(119                self.face.express_interest(120                    Name.from_str(command), interest_param=int_param, app_param=app_param121                )122            )123            content = json.loads(bytes(content).decode())124            if content['status'] == 'OK' or pubadvinfo['allow_undefined']:125                onSuccess(dataname)126                return True127            else:128                onFailure(dataname, content['reason'] if 'reason' in content else "Unknown error")129                return False130        except Exception as e:131            onFailure(dataname, f"{type(e).__name__}")132            return False133    def pubdata(self, dataname, seq, item, onSuccess, onFailure):134        """135        Pubsub function for issuing bulk data publication requests.136        :param dataname: data name which publications will be made to.137        :type dataname: str138        :param seq: sequence number139        :type seq: int140        :param item: data item to be published141        :type item: Any (that can be stringified)142        :param onSuccess:143            callback function of dataname and seq which is called when the request was successful144        :type onSuccess: function of (str, int) -> None145        :param onFailure:146            callback function of dataname, seq and a reason (message) of error147                which is called when the request was unsuccessful.148        :type onFailure: function of (str, int, str) -> None149        :return: True(succes) or False(failure)150        :rtype: bool151        """152        # Preamble153        dataname = normalize(dataname)154        seq = int(seq)155        data_pos = dataname + "/" + str(seq)156        # Handler for data item publication157        def send_data(int_name, int_param, app_parram):158            self.face.put_data(int_name, content=item.encode(), freshness_period=1)159        # Add a router for data transmission160        try:161            invoke_async(self.face.register(dataname, send_data))162        except:163            onFailure(dataname, seq, f"Couldn't register route {dataname}")164            return False165        # Do the publication allowing up to 3 times of retrial166        success = True167        trial_times = 1168        while True:169            try:170                # Make transmission request and wait for completion171                pubdatainfo = PubDataInfo(172                    data_prefix=dataname, data_sseq=seq, data_eseq=seq,173                    pub_prefix=self.pub_prefix)174                command, int_param, app_param = \175                    self.keeper.make_pubdata_cmd(self.keeper.svc_name, dataname,176                        seq, pubdatainfo)177                int_name, meta, content = invoke_async(178                    self.face.express_interest(179                        Name.from_str(command), interest_param=int_param, app_param=app_param180                    )181                )182                # Transmission completed183                content = json.loads(bytes(content).decode())184                if content['status'] != 'OK' or int(content['value']) != 1:185                    success = False186                    reason = content['reason'] if 'reason' in content \187                                               else 'Unknown pubdata error'188                if content['status'] != 'OK':   # Hopeless, return immediately189                    # onFailure(dataname, seq, reason)190                    break191            except (InterestNack, InterestTimeout):192                success = False193                reason = f"Broker unreachable or timeout"194            except Exception as e:195                success = False196                reason = f"{type(e).__name__} {str(e)}"197            if success:198                break199            trial_times += 1200            if trial_times > 3:201                break202        # Unregister route203        try:204            invoke_async(self.face.unregister(dataname))205        except:206            pass207        if success:208            onSuccess(dataname, seq)209            return True210        else:211            onFailure(dataname, seq, reason)212            return False213    def subtopic(self, topicname, onSuccess, onFailure, params={}):214        """215        Pubsub function for issuing topic-subscription requests.216        :param topicname: topic name which subscriptions is made against.217            May include MQTT-style wildcard characters such as + and #.218        :type topicname: str219        :param onSuccess:220            callback function of topicname and {dataname: [rn_name]} which is called when221                the request was successful222        :type onSuccess: function of (str, {str: [str]}) -> None223        :param onFailure:224            callback function of topicname and a reason (message) of error225                which is called when the request was unsuccessful.226        :type onFailure: function of (str, str) -> None227        :param params: dict of keys from 'servicetoken', and 'exclude'228            where 'servicetoken' is magic token for service validation, and229                  'exclude' is prefixes of data names to exclude from the result 230        :type params: dict or a JSON string231        :return: True(succes) or False(failure)232        :rtype: bool233        """234        # Preamble235        topicname = normalize(topicname)236        subinfo = populate(SubInfo(), params)237        # Make subtopic command238        command, int_param, app_param = \239            self.keeper.make_subtopic_cmd(self.keeper.svc_name, normalize(topicname),240                subinfo=subinfo)241        # Fire the command242        try:243            int_name, meta, content = invoke_async(244                self.face.express_interest(245                    Name.from_str(command), interest_param=int_param, app_param=app_param246                )247            )248            content = json.loads(bytes(content).decode())249        except Exception as e:250            onFailure(topicname, f"{type(e).__name__}")251            return False252        if content['status'] != 'OK':253            reason = content['reason'] if 'reason' in content else "Unknown subtopic error"254            onFailure(topicname, reason)255            return False256        values = content['value']257        # Process exclusion list258        exclude = subinfo['exclude']259        if not exclude:260            onSuccess(topicname, {v[0]: v[1] for v in values})261            return True262        if type(exclude) != list:263            exclude = [str(exclude)]264        onSuccess(topicname, {v[0]: v[1] for v in values 265                                         if not any(v[0].startswith(e) for e in exclude)})266        return True267    def submani(self, dataname, rn_name, onSuccess, onFailure):268        """269        Pubsub function for issuing data-manifest requests.270        :param dataname: data name where the data manifest will be found.271            Should be one of the valid data names of `rn_names` obtained by272            a recent `suptopic` request.273        :type dataname: str274        :param rn_name: rn name which the data will be fetched from.275            Should be a valid rn name obtained by a recent `subtopic` request.276        :type rn_name: str277        :param onSuccess:278            callback function of dataname and (rn_name, fst, lst)279                which is called when the request was successful280        :type onSuccess: function of (str, (str, int, int)) -> None281        :param onFailure:282            callback function of dataname and a reason (message) of error283                which is called when the request was unsuccessful.284        :type onFailure: function of (str, str) -> None285        :return: True(succes) or False(failure)286        :rtype: bool287        """288        # Preamble289        dataname = normalize(dataname)290        rn_name = normalize(rn_name)291        command, int_param, app_param = self.keeper.make_submani_cmd(dataname, rn_name)292        try:293            int_name, meta, content = invoke_async(294                self.face.express_interest(295                    Name.from_str(command), interest_param=int_param, app_param=app_param296                )297            )298            content = json.loads(bytes(content).decode())299            if content['status'] == 'OK':300                onSuccess(dataname, (rn_name, content['fst'], content['lst']))301                return True302            else:303                reason = content['reason'] if 'reason' in content \304                                           else f"Name {dataname} not found in store"305            return False306        except Exception as e:307            reason = f"{type(e).__name__} {str(e)}"308        onFailure(dataname, reason)309        return False310 311    def sublocal(self, topicname, onSuccess, onFailure, params={}):312        """313        Pubsub function for issuing topic-subscription to local broker requests.314        :param topicname: topic name which subscriptions is made to.315            Can also include MQTT-style wildcard characters such as + and #.316        :type topicname: str317        :param onSuccess:318            callback function of rn_name and [(dataname, fst, lst)] which is called when319                the request was successful320        :type onSuccess: function of (str, [(str, int, int)]) -> None321        :param onFailure:322            callback function of topicname and a reason (message) of error323                which is called when the request was unsuccessful.324        :type onFailure: function of (str, str) -> None325        :param params: dict of keys from 'servicetoken', and 'exclude'326            where 'servicetoken' is magic token for service validation, and327                  'exclude' is prefix(es) of data names to exclude from the result 328        :type params: dict or a JSON string329        :return: True(succes) or False(failure)330        :rtype: bool331        """332        # Preamble333        topicname = normalize(topicname)334        subinfo = populate(SubInfo(topicscope=TopicScope.LOCAL), params)335        # Make sublocal command336        command, interest_param, app_param = \337            self.keeper.make_subtopic_cmd(self.keeper.svc_name, normalize(topicname),338                local=True, subinfo=subinfo)339        # Fire the command340        try:341            int_name, meta, content = invoke_async(342                self.face.express_interest(343                    Name.from_str(command), interest_param=int_param, app_param=app_param344                )345            )346            content = json.loads(bytes(content).decode())347            if content['status'] != 'OK':348                reason = content['reason'] if 'reason' in content \349                                           else f"No matches for topic {topicname}"350                onFailure(topicName, reason)351                return False352        except Exception as e:353            onFailure(topicName, f"{type(e).__name__}")354            return False355        # Process exclusion list356        values = content['value']['manifests']357        if exclude:358            if type(exclude) != list:359                exclude = [str(exclude)]360            values = [v for v in values if not any(v[0].startswith(e) for e in exclude)]361        # All done362        onSuccess(content['value']['broker'], values)363        return True364    def subdata(self, dataname, seq, forward_to, lifetime=None):365        """366        Pubsub function for issuing data requests.367        :param dataname: data name where the data will be found.368            Should be one of the valid names obtained by a recent `subtopic` request.369        :type dataname: str370        :param seq: index of the data published under `dataname`.371        :type seq: int372        :param forward_to: forwarding hint information as the prefix of a broker.373        :type forward_to: str374        :param lifetime: time extent until which the interest should be kept alive.375        :type lifetime: int (in miliseconds)376        :return: Data found under the given data name and sequence number.377            None if no such data exists.378            (Can also occur InterestNack or InterestTimeout exceptions.)379        """380        command, int_param, app_param = \381            self.keeper.make_subdata_cmd(dataname, seq, forward_to, lifetime)382        int_name, meta, content = invoke_async(383            self.face.express_interest(Name.from_str(command), interest_param=int_param)384        )385        return bytes(content) if content else None...GS_Bot_Commands.py
Source:GS_Bot_Commands.py  
...36            return text, attachments37        dot = "digraph G {\n" + code[1] + "\n }"38        text        = ":information_source:  Rending dot code with size: {0}".format(len(dot))39        attachments = []40        Lambda('utils.dot_to_slack').invoke_async({'dot': dot , 'channel' : channel_id})41        return text, attachments42    # refactor into separate class43    @staticmethod44    def plantuml(slack_event, params=None):45        text    = slack_event["text"]46        channel = slack_event["channel"]47        code    = text.split("```")48        if len(code) != 3:49            text = '*GS Bot command execution error :exclamation:*'50            attachments = [{"text": 'you need to provide the code to red inside *```* blocks', 'color': 'danger'}]51            return text, attachments52        puml = "@startuml \n" + code[1] + "\n@enduml"53        text        = ":information_source:  Rending puml code with size: {0}".format(len(puml))54        attachments = [{"text": '```{0}```'.format(puml), 'color': 'good'}]55        Lambda('utils.puml_to_slack').invoke_async({'puml': puml, 'channel': channel})56        return text, attachments57    #move to new routing mode58    @staticmethod59    def browser(slack_event, params=None):60        Lambda('osbot_browser.lambdas.lambda_browser').invoke_async({'params': params, 'data': slack_event})61        return None, None62    # move to new routing mode63    @staticmethod64    def gdocs(slack_event, params=None):65        Lambda('osbot_gsuite.lambdas.gdocs').invoke_async({'params': params, 'data': slack_event})66        return None, None67    @staticmethod68    def mindmap(slack_event, params=None):69        channel = slack_event.get('channel')70        team_id = slack_event.get('team_id')71        if len(params) < 1:72            text = ':red_circle: Hi, for the `mindmap` command, you need to provide an `graph_name`'73            slack_message(text, [], channel, team_id)74            return None, None75        graph_name = params.pop(0)76        graph_params = ['go_js', graph_name, 'mindmap']77        graph_params.extend(params)78        Lambda('osbot_browser.lambdas.lambda_browser').invoke_async({"params": graph_params, 'data': {'team_id': team_id, 'channel': channel}})79        return None, None80    81    @staticmethod82    def graph(slack_event, params=None):83        Lambda('osbot_jira.lambdas.graph').invoke_async({'params': params, 'data': slack_event}) , []84        return None, None85    # move to new routing mode86    # @staticmethod     # add when there are more commands in there87    # def calendar(slack_event, params=None):88    #     Lambdas('gs.lambda_calendar').invoke_async({'params': params, 'data': slack_event})89    #     return (None, None)90    @staticmethod91    def jupyter(slack_event, params=None):92        Lambda('osbot_jupyter.lambdas.osbot').invoke_async({'params': params, 'data': slack_event}), []93        return None, None94    @staticmethod95    def slides(slack_event, params=None):96        Lambda('osbot_gsuite.lambdas.slides').invoke_async({'params': params, 'data': slack_event})97        return (None, None)98    # move to new routing mode99    @staticmethod100    def sheets(slack_event, params=None):101        Lambda('gs.lambda_sheets').invoke_async({'params': params, 'data': slack_event})102        return (None, None)103    # move to new routing mode104    @staticmethod105    def jira(slack_event, params=None):106        Lambda('osbot_jira.lambdas.elastic_jira').invoke_async({"params": params , "user": slack_event.get('user') , "channel": slack_event.get('channel'), 'team_id': slack_event.get('team_id') },)107        #Lambda('pbx_gs_python_utils.lambdas.gs.elastic_jira').invoke_async({"params": params , "user": slack_event.get('user') , "channel": slack_event.get('channel'), 'team_id': slack_event.get('team_id') },)108        return None, None109    @staticmethod110    def time(slack_event, params=None):111        user = slack_event.get('user')112        return 'Hi <@{0}>, the time now is: {1}'.format(user, datetime.now()), []113    @staticmethod114    def version(slack_event, params=None):115        return GS_Bot_Commands.gsbot_version,[]116    # @staticmethod117    # def reload_jira_lambda(slack_event=None, params=None):118    #     Lambdas('pbx_gs_python_utils.lambdas.gs.elastic_jira').update_with_src()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
