Best Python code snippet using Testify_python
symbolic_value.py
Source:symbolic_value.py  
...55    plus any additional input values56    '''57    discovered_additional_input_values = []58    discovered_operations = []59    def discover_from(v):60        if not hasattr(v, 'owner'):61            return62        if v in input_values:63            return64        if v.owner is None:65            if v not in discovered_additional_input_values:66                discovered_additional_input_values.append(v)67        elif v.owner not in discovered_operations:68            discovered_operations.append(v.owner)69            for v_inp in v.owner.inputs:70                discover_from(v_inp)71    for v in output_values:72        discover_from(v)73    return (sort_operations(discovered_operations),74            discovered_additional_input_values)75def sort_operations(unsorted_operations):76    'Sort operations so that they can be performed in order'77    sorted_operations = []78    def is_computable(v):79        return (not _is_like_sa_value(v) or80                v.owner is None or81                v.owner not in unsorted_operations)82    while len(unsorted_operations):83        removed_any = False84        for op in copymodule.copy(unsorted_operations):85            if all([is_computable(inp) for inp in op.inputs]):86                unsorted_operations.remove(op)...app.py
Source:app.py  
1#!/usr/bin/env python2# -*- coding: utf-8 -*-3from __future__ import absolute_import4from __future__ import unicode_literals5from py_privatekonomi.utilities import helper6from py_privatekonomi.utilities.common import as_obj, is_string7from py_privatekonomi.utilities.proxy import HookProxy8from py_privatekonomi.utilities.models import rebuild_tables, create_tables9from py_privatekonomi.core import loader10from py_privatekonomi.core.error import (MissingAppFunctionError, FormatterError, ParserError)11import py_privatekonomi.core.db12from py_privatekonomi.core.mappers.economy_mapper import EconomyMapper13import copy14class AppProxy(HookProxy):15    def __init__(self, objname, obj):16        super(AppProxy, self).__init__(objname, obj)17        self.__called = []18    def _pre( self, name, *args, **kwargs ):19        self.__called.append(name)20    def _post(self, name, *args, **kwargs ):21        pass22    def _pre_run(self, *args, **kwargs ):23        if super(AppProxy, self).getObj().isBuilt() and len(self.__called) > 0:24            if len(self.__called) == 1 and self.__called[0] == 'run':25                return26            raise Exception("Following methods called without building first: %s" % (repr(self.__called)))27    def _post_run(self, *args, **kwargs):28        self.__called = []29    def _post_build(self, *args, **kwargs):30        self.__called = []31    def __repr__(self):32        return super(AppProxy, self).getObj().__repr__()33class App(object):34    def __init__(self):35        self.__formatter = None36        self.__parser = None37        self.__sources = []38        self.__persist = False39        self.__config =  {}40        self.__db = {}41        self.__output = None42        self.__auto_discover = False43        self.__discover_from = None44        self.app = None45    def setFormatter(self, formatter_name):46        self.__formatter = formatter_name47    def setParser(self, parser_name):48        self.__parser = parser_name49    def addSource(self, source_name):50        self.__sources.append(source_name)51    def addSources(self, source_names):52        self.__sources.extend(source_names)53    def clearSources(self):54        self.__sources = []55    def persistWith(self, database_settings):56        self.__persist = True57        self.__db = database_settings58    def setOutput(self, output):59        """ Setting the output will avoid calling execute, but60        may still trigger a call to persist.61        If output is set, the building process will not consider62        formatter, parser, or source(s) """63        self.__output = output64    def autodiscover(self, discover_from):65        """ This will attempt to guess the formatter66            and parser given a list of parsers and formatters of format:67            discover_from = [68                {69                    'formatter' : 'swedbank',70                    'parser' : 'swedbank'71                }, { ... }72            ]73        """74        if self.__formatter is not None:75            raise Exception("Unable to autodiscover; Formatter is already set to: %s" % (self.__formatter))76        if self.__parser is not None:77            raise Exception("Unable to autodiscover; Parser is already set to: %s" % (self.__parser))78        self.__auto_discover = True79        self.__discover_from = discover_from80    def config(self, conf):81        self.__config.update(conf)82    def isBuilt(self):83        return self.app is not None84    def __set_parser(self, core):85        self.app['parser'] = None86        if self.__parser is not None:87            self.app['parser'] = self.__load_parser(self.__parser, core)88    def __set_formatter(self, core):89        self.app['formatter'] = None90        if self.__formatter is not None:91            self.app['formatter'] = self.__load_formatter(self.__formatter, core)92    def __load_formatter(self, formatter_name, core):93        return loader.load_formatter(formatter_name, core['factories']['formatters']['account_formatter_factory'])94    def __load_parser(self, parser_name, core):95        return loader.load_parser(parser_name, core['factories']['parsers']['account_parser_factory'])96    def __autodiscover(self, core):97        self.app['formatter'] = None98        self.app['parser'] = None99        for discover in self.__discover_from:100            formatter = self.__load_formatter(discover['formatter'], core)101            if formatter is not None:102                parser = self.__load_parser(discover['parser'], core)103                if parser is not None:104                    self.app['formatter'] = formatter105                    self.app['parser'] = parser106                    return True107        return False108    def _rebuildTables(self, raw_models=None, customizations={}):109        _customizations = {}110        if 'customizations' in self.__config:111            _customizations = self.__config['customizations']112        apply_customizations = _customizations.copy()113        apply_customizations.update(customizations)114        if raw_models is None:115            raw_models = loader.load_models(EconomyMapper.getModelNames())116        return rebuild_tables(raw_models, apply_customizations)117    def _createTables(self, raw_models=None, customizations={}):118        _customizations = {}119        if 'customizations' in self.__config:120            _customizations = self.__config['customizations']121        apply_customizations = _customizations.copy()122        apply_customizations.update(customizations)123        if raw_models is None:124            raw_models = loader.load_models(EconomyMapper.getModelNames())125        return create_tables(raw_models, customizations)126    def build(self):127        self.app = {}128        core = loader.load_core()129        self.app['core'] = core130        if len(self.__db) > 0:131            self.__config['database'] = self.__db132        if self.__output is None and self.__auto_discover is False:133            if self.__formatter is None:134                raise Exception("Formatter has not been specified, please call setFormatter, setOutput, or autodiscover")135            if self.__parser is None:136                raise Exception("Parser has not been specified, please call setParser, setOutput, or autodiscover")137            if len(self.__sources) == 0:138                raise Exception("Sources have not been specified, please call addSource, setOutput, or autodiscover")139            self.__set_parser(self.app['core'])140            self.__set_formatter(self.app['core'])141        elif self.__auto_discover is True:142            found = self.__autodiscover(self.app['core'])143            if not found:144                raise Exception("Unable to find parser/formatter from %s" % (repr(self.__discover_from)))145        if self.__persist is True:146            if self.__parser is None or self.__formatter is None:147                raise Exception("persist requires formatter/parser to be set, please call setParser and/or setFormatter")148            try:149                customizations = loader.load_customizations(self.__parser)150                self.config({151                    'customizations' : customizations152                })153            except ImportError as e:154                pass155        return self156    def run(self):157        def __execute():158            errors = []159            if self.__auto_discover is True:160                found = False161                for discover in self.__discover_from:162                    try:163                        parser = self.__load_parser(discover['parser'], self.app['core'])164                        formatter = self.__load_formatter(discover['formatter'],  self.app['core'])165                        if parser is None or formatter is None:166                            continue167                        output = self.execute(168                            sources=self.__sources,169                            parser=parser,170                            formatter=formatter,171                            configs=as_obj(self.__config))172                        self.app['parser'] = discover['parser']173                        self.app['formatter'] = discover['formatter']174                        found = True175                        break176                    except FormatterError as e:177                        e_ = repr(e)178                        e_ += "(%s)" % (repr(self.__sources))179                        errors.append(e_)180                        continue181                    except ParserError as e:182                        e_ = repr(e)183                        e_ += "(%s)" % (repr(self.__sources))184                        errors.append(e_)185                        continue186                if found is False:187                    raise Exception("Unable to parse/format using available parsers and formatters from %s (errors=%s)" % (repr(self.__discover_from), repr(errors)))188            else:189                output = self.execute(self.__sources, self.app['parser'], self.app['formatter'], as_obj(self.__config))190            return output191        if self.app is None:192            raise Exception("Build app using app.build() before running.")193        ret = {}194        if self.__output is not None:195            ret['execute'] = self.__output196            ret['formatter'] = 'unknown'197            ret['parser'] = 'unknown'198        else:199            if 'execute' not in dir(self):200                raise MissingAppFunctionError(capture_data={201                    'fun_name' : 'execute',202                    'app' : self.app203                })204            ret['execute'] = __execute()205            if not is_string(self.app['formatter']):206                ret['formatter'] = self.app['formatter'].getName()207            else:208                ret['formatter'] = self.app['formatter']209            if not is_string(self.app['parser']):210                ret['parser'] = self.app['parser'].getName()211            else:212                ret['parser'] = self.app['parser']213        if self.__persist is True:214            if 'persist' not in dir(self):215                raise MissingAppFunctionError(capture_data={216                    'fun_name' : 'persist',217                    'app' : self.app218                })219            try:220                py_privatekonomi.core.db.DB().connect(self.__config['database'])221            except AttributeError as e:222                raise Exception("Unable to connect to database: inaccurate database settings.", e)223            ret['persist'] = self.persist(ret['execute'], as_obj(self.__config))224        return ret225    def clear_password(self, v):226        if isinstance(v, dict):227            if "password" in v:228                del v["password"]229                v['password'] = '<censored>'230            for ele in v.values():231                self.clear_password(ele)232    def __repr__(self):233        config = self.__config234        config = copy.deepcopy(config)235        self.clear_password(config)...incident.py
Source:incident.py  
1from time import time, gmtime, strftime2from pdpyras import APISession3def incident_iter_selected(api_key, duration, service_ids, integrations, all_tags):4    api_session = APISession(api_key)5    durations = {"0": 30, "1": 60, "2": 90, "3": 120, "4": 150, "5": 180, "6": 210, "7": 240, "8": 270, "9": 300,6                 "10": 330, "11": 360, "12": 440, "13": 720, "14": 900, "15": 1080}7    incidents = get_incidents(durations[duration], api_session, service_ids, integrations, all_tags)8    print("Found %s for Service %s for %s months with integration: %s" % (\9        str(len(incidents)), service_ids[0], str(int(duration) + 1), integrations))10    return incidents11def get_incidents(duration, api_session, service_ids, integrations, all_tags):12    incidents = []13    for i in range(30, duration + 30, 30):14        disco_param = discovery_params(i, service_ids)15        temp_incidents = iter_incidents(api_session, disco_param, integrations, all_tags)16        incidents = incidents + temp_incidents17    return incidents18def discovery_params(i, services):19    current_window = i + 5 if i == 360 else i20    time_today = int(time()) - ((i - 30) * 24 * 60 * 60)21    time_to = int(time()) - (current_window * 24 * 60 * 60)22    discover_from = strftime('%Y-%m-%dT%H:%M:%S-00', gmtime(time_today))23    discover_to = strftime('%Y-%m-%dT%H:%M:%S-00', gmtime(time_to))24    window_param = {'since': discover_to, 'until': discover_from, 'service_ids[]': [services], 'time_zone': 'UTC',25                    'include[]': ['first_trigger_log_entries']}26    return window_param27def iter_incidents(api_session, window_param, integrations, all_tags):28    all_incidents = []29    ignored = []30    count = 031    # Making PagerDuty API calls for Incidents in this section32    for current_incident in api_session.iter_all('incidents', params=window_param, paginate=True):33        temp_incident = current_incident34        alerts = api_session.rget('incidents/%s/alerts' % current_incident['id'])35        try:36            temp_incident["all_alerts"] = alerts["alerts"]37        except TypeError:38            temp_incident["all_alerts"] = alerts39        if "[REDACTED] by" in current_incident['description']:40            continue41        ftle_channel = current_incident["first_trigger_log_entry"]["channel"]42        # print('--------\n',integrations)43        # print(ftle_channel['details'], '\n')44        if integrations.lower() == 'datadog' and 'tags' in ftle_channel['details']:45            tags = current_incident["first_trigger_log_entry"]["channel"]["details"]["tags"]46        elif integrations.lower() == 'dynatrace' and 'Tags' in ftle_channel['details']:47            tags = current_incident["first_trigger_log_entry"]["channel"]["details"]["Tags"]48        elif integrations.lower() == 'nagios' and 'host' in ftle_channel:49            tags = 'hostname:' + current_incident["first_trigger_log_entry"]["channel"]["host"]50        elif integrations.lower() == 'checkmk' and 'host' in ftle_channel:51            tags = current_incident["first_trigger_log_entry"]["channel"]["host"]52        else:53            tags = ""54        if tags:55            extract_tags(tags, all_tags)56        temp_incident["tags"] = tags57        temp_incident["integration"] = integrations58        all_incidents.append(temp_incident)59    return all_incidents60def extract_tags(current_tags, all_tags):61    first_layer_tags = current_tags.split(",")62    if "untagged" not in all_tags:63        all_tags["untagged"] = []64    for tag in first_layer_tags:65        tag_extract = tag.strip().replace(" ", "_").split(":")66        if len(tag_extract) == 1 and tag_extract[0] not in all_tags["untagged"]:67            all_tags["untagged"].append(tag_extract[0])68        elif len(tag_extract) == 2:69            if tag_extract[0] not in all_tags:70                all_tags[tag_extract[0]] = [tag_extract[1]]71            elif tag_extract[1] not in all_tags[tag_extract[0]]:72                all_tags[tag_extract[0]].append(tag_extract[1])73        elif len(tag_extract) == 3:74            if tag_extract[0] not in all_tags:75                all_tags[tag_extract[0]] = [tag_extract[1] + tag_extract[2]]76            elif tag_extract[1] + tag_extract[2] not in all_tags[tag_extract[0]]:77                all_tags[tag_extract[0]].append(tag_extract[1] + tag_extract[2])78# api_key = "ozhUyFftDxYFTR2rsVWQ"79# service_ids = ['PG8L64X', 'PYZQ56E']80# integrations = ['Datadog', 'Dynatrace']81# durations = ['5', '9', '10']82# print("---GETTING INCIDENTS---")83# incidents = []84# tags = {}85# for i in range(len(service_ids)):86#     print("---Getting incidents for %s in %s month(s)---" % (service_ids[i], str(int(durations[i]) + 1)))87#     incidents += incident_iter_selected(api_key, durations[i], service_ids[i], integrations[i], tags)88#89# print("---GOT INCIDENTS---")90#91# count = 192# for incident in incidents:93#     print(count, incident)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
