Best Python code snippet using robotframework-pageobjects_python
registry.py
Source:registry.py  
1# Licensed under a 3-clause BSD style license - see LICENSE.rst2"""3VO Queries4"""5from __future__ import print_function, division6#from IPython.core.debugger import Tracer7from astroquery.query import BaseQuery8from IPython.display import Markdown, display9from . import utils10__all__ = ['Registry', 'RegistryClass']11class RegistryClass(BaseQuery):12    """13    Registry query class.14    """15    def __init__(self):16        super(RegistryClass, self).__init__()17        self._TIMEOUT = 60 # seconds18        self._RETRIES = 2 # total number of times to try19        self._REGISTRY_TAP_SYNC_URL = "http://vao.stsci.edu/RegTAP/TapService.aspx/sync"20    def query(self, **kwargs):21        adql = self._build_adql(**kwargs)22        if adql is None:23            raise ValueError('Unable to compute query based on input arguments.')24        if kwargs.get('verbose'):25            print('Registry:  sending query ADQL = {}\n'.format(adql))26        url = self._REGISTRY_TAP_SYNC_URL27        tap_params = {28            "request": "doQuery",29            "lang": "ADQL",30            "query": adql31        }32        response = utils.try_query(url, post_data=tap_params, timeout=self._TIMEOUT, retries=self._RETRIES)33        if kwargs.get('verbose'):34            print('Queried: {}\n'.format(response.url))35        aptable = utils.astropy_table_from_votable_response(response)36        return aptable37    # TBD support list of wavebands38    # TBD maybe support raw ADQL clause (or maybe we should just make39    # sure they can call a basic TAP query)40    def _build_adql(self, **kwargs):41        # Default values42        service_type = ""43        keyword = ""44        not_keyword = ""45        waveband = ""46        source = ""47        publisher = ""48        order_by = ""49        logic_string = " and "50        # Find the keywords we recognize51        for key, val in kwargs.items():52            if key == 'service_type':53                service_type = val54            elif key == 'keyword':55                keyword = val56            elif key == 'not_keyword':57                not_keyword = val58            elif key == 'waveband':59                waveband = val60            elif key == 'source':61                source = val62            elif key == 'publisher':63                publisher = val64            elif key == 'order_by':65                order_by = val66            elif key == 'logic_string':67                logic_string = val68        ##69        if "image" in service_type.lower():70            service_type = "simpleimageaccess"71        elif "spectr" in service_type.lower():72            service_type = "simplespectralaccess"73        elif "cone" in service_type.lower():74            service_type = "conesearch"75        elif 'tap' in service_type or 'table' in service_type:76            service_type = "tableaccess"77        else:78            print("ERROR: please give a service_type that is one of image, spectral, cone, or table")79            return None80        query_retcols = """81          select res.waveband,res.short_name,cap.ivoid,res.res_description,82          intf.access_url,res.reference_url,res_role.role_name as publisher,cap.cap_type as service_type83          from rr.capability as cap84            natural join rr.resource as res85            natural join rr.interface as intf86		    natural join rr.res_role as res_role87            """88        query_where = " where "89        wheres = []90        if service_type != "":91            wheres.append("cap.cap_type like '%{}%'".format(service_type))92        #currently not supporting SIAv2 in SIA library.93        if service_type == 'simpleimageaccess':94            wheres.append("standard_id != 'ivo://ivoa.net/std/sia#query-2.0'")95        if source != "":96            wheres.append("cap.ivoid like '%{}%'".format(source))97        if waveband != "":98            if ',' in waveband:99                allwavebands = []100                for w in waveband.split(','):101                    allwavebands.append("res.waveband like '%{}%' ".format(w).strip())102                wheres.append("(" + " or ".join(allwavebands) + ")")103            else:104                wheres.append("res.waveband like '%{}%'".format(waveband))105        wheres.append("res_role.base_role = 'publisher'")106        if publisher != "":107            wheres.append("res_role.role_name like '%{}%'".format(publisher))108        if keyword != "":109            keyword_where = """110             (res.res_description like '%{}%' or111            res.res_title like '%{}%' or112            cap.ivoid like '%{}%')113            """.format(keyword, keyword, keyword)114            wheres.append(keyword_where)115        if not_keyword != "":116            notkey_where="""117            not (res.res_description like '%{}%' or118            res.res_title like '%{}%' or119            cap.ivoid like '%{}%')120            """.format(not_keyword, not_keyword, not_keyword)121        query_where = query_where+logic_string.join(wheres)122        if order_by != "":123            query_order = "order by {}".format(order_by)124        else:125            query_order = ""126        query = query_retcols+query_where+query_order127        return query128    def query_counts(self, field, minimum=1, **kwargs):129        adql = self._build_counts_adql(field, minimum)130        if kwargs.get('verbose'):131            print('Registry:  sending query ADQL = {}\n'.format(adql))132        url = self._REGISTRY_TAP_SYNC_URL133        tap_params = {134            "request": "doQuery",135            "lang": "ADQL",136            "query": adql137        }138        response = self._request('POST', url, data=tap_params, cache=False)139        if kwargs.get('verbose'):140            print('Queried: {}\n'.format(response.url))141        aptable = utils.astropy_table_from_votable_response(response)142        return aptable143    def _build_counts_adql(self, field, minimum=1):144        field_table = None145        field_alias = field146        query_where_filter = ''147        if field.lower() == 'waveband':148            field_table = 'rr.resource'149        elif field.lower() == 'publisher':150            field_table = 'rr.res_role'151            field = 'role_name'152            query_where_filter = ' where base_role = \'publisher\' '153        elif field.lower() == 'service_type':154            field_table = 'rr.capability'155            field = 'cap_type'156        if field_table is None:157            return None158        else:159            query_select = 'select ' + field + ' as ' + field_alias + ', count(' + field + ') as count_' + field_alias160            query_from = ' from ' + field_table161            query_where_count_min = ' where count_' + field_alias + ' >= ' + str(minimum)162            query_group_by = ' group by ' + field163            query_order_by = ' order by count_' + field_alias + ' desc'164            query = 'select * from (' + query_select + query_from + query_where_filter + query_group_by + ') as count_table' + query_where_count_min + query_order_by165            return query166    167    def display_results(self, results):168        # Display results in a readable way including the169        # short_name, ivoid, res_description and reference_url.170    171        for row in results:172            md = f"### {row['short_name']} ({row['ivoid']})"173            display(Markdown(md))174            print(row['res_description'])175            print(f"More info: {row['reference_url']} ")176            print(f"(Access URL: {row['access_url']} )")177            ...gnip_rule_generate.py
Source:gnip_rule_generate.py  
1# -*- coding: utf-8 -*-2__author__ = 'sunary'3import subprocess4import json5import pandas as pd6from utils import my_helper7class RuleGenerate():8    '''9    Generator json by gnip format from keywords10    '''11    def __init__(self):12        self.rule = {'rules': []}13    def read_csv(self, file_input):14        self.pd_file = pd.read_csv(file_input)15    def generate(self, file_input, file_output):16        self.read_csv(file_input)17        execute_date = set()18        for i in range(len(self.pd_file['NAME'])):19            if self.pd_file['ADD'][i] == 'yes':20                execute_date.add(self.pd_file['NEW STATUS'][i])21                r = {'value': '', 'tag': 'tag-' + str(i)}22                keyword = self.split_semicolon(self.pd_file['KEYWORDS'][i])23                not_keyword = self.split_semicolon(self.pd_file['NOT'][i])24                contain = self.split_semicolon(self.pd_file['REQUEST'][i])25                if keyword or not_keyword:26                    rule_keyword = ''27                    rule_not_keyword = ''28                    for con in contain:29                        if con.lower() == 'message':30                            print rule_keyword31                            if rule_keyword and keyword:32                                rule_keyword += ' OR '33                            rule_keyword += self.join_keyword(keyword= keyword, prefix= '')34                            if rule_not_keyword and not_keyword:35                                rule_not_keyword += ' OR '36                            rule_not_keyword += self.join_keyword(keyword= not_keyword, prefix= '')37                        elif con.lower() == 'description':38                            if rule_keyword and keyword:39                                rule_keyword += ' OR '40                            rule_keyword += self.join_keyword(keyword= keyword, prefix='bio_contains')41                            if rule_not_keyword and not_keyword:42                                rule_not_keyword += ' OR '43                            rule_not_keyword += self.join_keyword(keyword= not_keyword, prefix='bio_contains')44                    rule_keyword = '(' + rule_keyword + ')' if rule_keyword and rule_not_keyword else rule_keyword45                    rule_not_keyword = 'NOT (' + rule_not_keyword + ')' if rule_not_keyword else rule_not_keyword46                    r['value'] = rule_keyword + (' ' if rule_keyword and rule_not_keyword else '') + rule_not_keyword47                    if len(r['value']) > 2048:48                        return 0, self.pd_file['NAME'][i]49                    if len(r['value']) > 0:50                        self.rule['rules'].append(r)51        fo = open(file_output, 'w+')52        fo.write(json.dumps(self.rule, ensure_ascii= False))53        fo.close()54        return list(execute_date), json.dumps(self.rule, ensure_ascii= False, indent=4, sort_keys=True)55    def split_semicolon(self, keyword):56        keyword = keyword.split('; ') if not my_helper.pandas_null(keyword) else []57        if keyword:58            for i in range(len(keyword)):59                if ';' in keyword[i]:60                    keyword[i] = keyword[i].replace(';', '')61                if '"' in keyword[i]:62                    keyword[i] = keyword[i].replace('"', '')63            i = 064            while i < len(keyword):65                if not keyword[i]:66                    del keyword[i]67                else:68                    i += 169        return keyword70    def join_keyword(self, keyword = [], prefix = ''):71        gen_keyword = ['']*len(keyword)72        for i in range(len(keyword)):73            gen_keyword[i] = self.add_double_quotes(keyword[i])74        if prefix:75            for i in range(len(gen_keyword)):76                gen_keyword[i] = prefix + ':' + gen_keyword[i]77        return ' OR '.join(gen_keyword)78    def add_double_quotes(self, keyword):79        list_special = [' ', '.', '!', '%', '&', '_', ':', '+', '-', '?', '#', '@', '<', '=', '>', '(', ')']80        for char in list_special:81            if char in keyword:82                return '"' + keyword + '"'83        return keyword84    def post(self, file_output):85        subprocess.Popen('curl -v -X POST -u anh.le@sentifi.com:s1822013! https://api.gnip.com:443/accounts/Sentifi/publishers/twitter/streams/track/prod/rules.json -d @gnip.json',86                                cwd = file_output,87                                stdout=subprocess.PIPE,88                                stderr=subprocess.PIPE,89                                shell=True)90    def delete(self, file_output):91        subprocess.Popen('curl -v -X DELETE -u anh.le@sentifi.com:s1822013! https://api.gnip.com:443/accounts/Sentifi/publishers/twitter/streams/track/prod/rules.json -d @gnip.json',92                                cwd = file_output,93                                stdout=subprocess.PIPE,94                                stderr=subprocess.PIPE,95                                shell=True)96# curl -v -X POST -u anh.le@sentifi.com:s1822013! "https://api.gnip.com:443/accounts/Sentifi/publishers/twitter/streams/track/prod/rules.json" -d @gnip.json97# curl -v -X DELETE -u anh.le@sentifi.com:s1822013! "https://api.gnip.com:443/accounts/Sentifi/publishers/twitter/streams/track/prod/rules.json" -d @gnip.json98# curl -v -X POST -u anh.le@sentifi.com:s1822013! "https://api.gnip.com:443/accounts/Sentifi/publishers/twitter/streams/track/dev/rules.json" -d @gnip.json99# curl -v -X DELETE -u anh.le@sentifi.com:s1822013! "https://api.gnip.com:443/accounts/Sentifi/publishers/twitter/streams/track/dev/rules.json" -d @gnip.json100# mongoexport -h mongo6.ssh.sentifi.com -d sentifi -c tweet -f text -q '{"_id":{$gt:ObjectId("55762c800000000000000000")}, "_source": "gnip"}' --limit 500 --csv -o lastest_messages.csv101if __name__ == '__main__':102    generate = RuleGenerate()103    generate.generate('/home/nhat/data/Publisher Candidates Inventory - GNIP Rules.csv',104                      '/home/nhat/data/result/gnip.json')...deco.py
Source:deco.py  
...12#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13#  See the License for the specific language governing permissions and14#  limitations under the License.15import inspect16def not_keyword(func):17    """Decorator to disable exposing functions or methods as keywords.18    Examples::19        @not_keyword20        def not_exposed_as_keyword():21            # ...22        def exposed_as_keyword():23            # ...24    Alternatively the automatic keyword discovery can be disabled with25    the :func:`library` decorator or by setting the ``ROBOT_AUTO_KEYWORDS``26    attribute to a false value.27    New in Robot Framework 3.2.28    """29    func.robot_not_keyword = True30    return func31not_keyword.robot_not_keyword = True32@not_keyword33def keyword(name=None, tags=(), types=()):34    """Decorator to set custom name, tags and argument types to keywords.35    This decorator creates ``robot_name``, ``robot_tags`` and ``robot_types``36    attributes on the decorated keyword function or method based on the37    provided arguments. Robot Framework checks them to determine the keyword's38    name, tags, and argument types, respectively.39    Name must be given as a string, tags as a list of strings, and types40    either as a dictionary mapping argument names to types or as a list41    of types mapped to arguments based on position. It is OK to specify types42    only to some arguments, and setting ``types`` to ``None`` disables type43    conversion altogether.44    If the automatic keyword discovery has been disabled with the45    :func:`library` decorator or by setting the ``ROBOT_AUTO_KEYWORDS``46    attribute to a false value, this decorator is needed to mark functions47    or methods keywords.48    Examples::49        @keyword50        def example():51            # ...52        @keyword('Login as user "${user}" with password "${password}"',53                 tags=['custom name', 'embedded arguments', 'tags'])54        def login(user, password):55            # ...56        @keyword(types={'length': int, 'case_insensitive': bool})57        def types_as_dict(length, case_insensitive):58            # ...59        @keyword(types=[int, bool])60        def types_as_list(length, case_insensitive):61            # ...62        @keyword(types=None])63        def no_conversion(length, case_insensitive=False):64            # ...65    """66    if inspect.isroutine(name):67        return keyword()(name)68    def decorator(func):69        func.robot_name = name70        func.robot_tags = tags71        func.robot_types = types72        return func73    return decorator74@not_keyword75def library(scope=None, version=None, doc_format=None, listener=None,76            auto_keywords=False):77    """Class decorator to control keyword discovery and other library settings.78    By default disables automatic keyword detection by setting class attribute79    ``ROBOT_AUTO_KEYWORDS = False`` to the decorated library. In that mode80    only methods decorated explicitly with the :func:`keyword` decorator become81    keywords. If that is not desired, automatic keyword discovery can be82    enabled by using ``auto_keywords=True``.83    Arguments ``scope``, ``version``, ``doc_format`` and ``listener`` set the84    library scope, version, documentation format and listener by using class85    attributes ``ROBOT_LIBRARY_SCOPE``, ``ROBOT_LIBRARY_VERSION``,86    ``ROBOT_LIBRARY_DOC_FORMAT`` and ``ROBOT_LIBRARY_LISTENER``, respectively.87    These attributes are only set if the related arguments are given and they88    override possible existing attributes in the decorated class.89    Examples::90        @library91        class KeywordDiscovery:92            @keyword93            def do_something(self):94                # ...95            def not_keyword(self):96                # ...97        @library(scope='GLOBAL', version='3.2')98        class LibraryConfiguration:99            # ...100    The ``@library`` decorator is new in Robot Framework 3.2.101    """102    if inspect.isclass(scope):103        return library()(scope)104    def decorator(cls):105        if scope is not None:106            cls.ROBOT_LIBRARY_SCOPE = scope107        if version is not None:108            cls.ROBOT_LIBRARY_VERSION = version109        if doc_format is not None:...AppiumFlutterLibrary.py
Source:AppiumFlutterLibrary.py  
1import os2import base643import robot4from robot.libraries.BuiltIn import BuiltIn5from robot.api import logger6from robot.api.deco import keyword, library, not_keyword7from appium.webdriver import Remote8from appium_flutter_finder.flutter_finder import FlutterElement, FlutterFinder9@library(scope='GLOBAL', auto_keywords=True)10class AppiumFlutterLibrary:11  def __init__(self):12    self.finder = FlutterFinder()13    self._screenshot_index = 014  # PUBLIC15  @keyword('Open Application')16  def open_application(self, remote_url, **kwargs):17    desired_caps = kwargs18    self.driver = Remote(str(remote_url), desired_caps)19  @keyword('Click Element')20  def click_element(self, locator: str, strategy: str='value_key'):21    element: FlutterElement = self._get_element(locator, strategy)22    element.click()23    self._html(f'Clicking the element [{locator}]')24  @keyword('Input Text')25  def input_text(self, locator: str, text: str, strategy: str='value_key'):26    element: FlutterElement = self._get_element(locator, strategy)27    element.send_keys(text)28    self._html(f'typing [{text}] in the [{locator}] element')29  @keyword('Get Text')30  def get_text(self, locator: str, strategy: str='value_key'):31    element: FlutterElement = self._get_element(locator, strategy)32    return element.text33  @keyword('Page Back')34  def page_back(self):35    self.finder.page_back()36  @keyword('Clear Text')37  def clear_text(self, locator: str, strategy: str='value_key'):38    element: FlutterElement = self._get_element(locator, strategy)39    self._html(f'clearing text from the [{locator}] element')40    element.clear()41  @keyword('Clear Time Line')42  def clear_time_line(self):43    self.driver.execute('flutter:clearTimeline')44  @keyword('Check Health')45  def check_health(self):46    self.driver.execute('flutter:checkHealth')47  @keyword('Close Application')48  def close_application(self):49    self.driver.quit()50  @keyword('Capture Page Screenshot')51  def capture_page_screenshot(self, filename: str=None):52    path, link = self._get_screenshot_paths(filename)53    base = self.driver.get_screenshot_as_base64()54    with open(path, 'wb') as fh:55      fh.write(base64.urlsafe_b64decode(base))56    self._html('</td></tr><tr><td colspan="3"><a href="%s">''<img src="%s" width="800px"></a>' % (link, link))57    58  @keyword('Wait For Element')59  def wait_for_element(self, locator: str, timeout: int=100, strategy: str='value_key'):60    self.driver.execute_script('flutter:waitFor', self._get_locator(locator, strategy), timeout)61  # @keyword('Get Element Attribute')62  # def get_element_attribute(self, locator: str, attribute: str, strategy: str='value_key'):63  #   element: FlutterElement = self._get_element(locator, strategy)64  #   return element.get_attribute(attribute)65  # @keyword('Wait For Element Absent')66  # def wait_for_element_absent(self, locator: str, strategy: str='value_key'):67  #   self.driver.execute('flutter:waitForAbsent', self._get_locator(locator, strategy))68  # @keyword('Scroll To')69  # def scroll_to(self, locator: str, x: int, y: int, duration: int=200, strategy: str='value_key'):70  #   element: FlutterElement = self._get_element(locator, strategy)71  #   self.driver.execute('flutter:scroll', {72  #     'element': element,73  #     'dx': x,74  #     'dy': y,75  #     'durationMilliseconds': duration,76  #     'frequency': 3077  #   })78  # PRIVATE79  @not_keyword80  def _get_element(self, locator, strategy='value_key') -> FlutterElement:81    element = self._get_locator(locator, strategy)82    return FlutterElement(self.driver, element)83  @not_keyword84  def _get_locator(self, locator, strategy='value_key') -> FlutterFinder:85    if strategy == 'value_key':86      return self.finder.by_value_key(locator)87    if strategy == 'text':88      return self.finder.by_text(locator)89    if strategy == 'tooltip':90      return self.finder.by_tooltip(locator)91    if strategy == 'ancestor':92      return self.finder.by_ancestor(locator)93    if strategy == 'descendant':94      return self.finder.by_descendant(locator)95    if strategy == 'semantics':96      return self.finder.by_semantics_label(locator)97    if strategy == 'type':98      return self.finder.by_type(locator)99  # Utilities100  @not_keyword101  def _html(self, message):102    logger.info(message, True, False)103  @not_keyword104  def _get_log_dir(self):105    variables = BuiltIn().get_variables()106    logfile = variables['${LOG FILE}']107    if logfile != 'NONE':108      return os.path.dirname(logfile)109    return variables['${OUTPUTDIR}']110  @not_keyword111  def _get_screenshot_paths(self, filename):112    if not filename:113      self._screenshot_index += 1114      filename = 'appium-screenshot-%d.png' % self._screenshot_index115    else:116      filename = filename.replace('/', os.sep)117    logdir = self._get_log_dir()118    path = os.path.join(logdir, filename)119    link = robot.utils.get_link_path(path, logdir)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
