How to use discover_from method in Testify

Best Python code snippet using Testify_python

symbolic_value.py

Source:symbolic_value.py Github

copy

Full Screen

...55 plus any additional input values56 '''57 discovered_additional_input_values = []58 discovered_operations = []59 def discover_from(v):60 if not hasattr(v, 'owner'):61 return62 if v in input_values:63 return64 if v.owner is None:65 if v not in discovered_additional_input_values:66 discovered_additional_input_values.append(v)67 elif v.owner not in discovered_operations:68 discovered_operations.append(v.owner)69 for v_inp in v.owner.inputs:70 discover_from(v_inp)71 for v in output_values:72 discover_from(v)73 return (sort_operations(discovered_operations),74 discovered_additional_input_values)75def sort_operations(unsorted_operations):76 'Sort operations so that they can be performed in order'77 sorted_operations = []78 def is_computable(v):79 return (not _is_like_sa_value(v) or80 v.owner is None or81 v.owner not in unsorted_operations)82 while len(unsorted_operations):83 removed_any = False84 for op in copymodule.copy(unsorted_operations):85 if all([is_computable(inp) for inp in op.inputs]):86 unsorted_operations.remove(op)...

Full Screen

Full Screen

app.py

Source:app.py Github

copy

Full Screen

1#!/usr/bin/env python2# -*- coding: utf-8 -*-3from __future__ import absolute_import4from __future__ import unicode_literals5from py_privatekonomi.utilities import helper6from py_privatekonomi.utilities.common import as_obj, is_string7from py_privatekonomi.utilities.proxy import HookProxy8from py_privatekonomi.utilities.models import rebuild_tables, create_tables9from py_privatekonomi.core import loader10from py_privatekonomi.core.error import (MissingAppFunctionError, FormatterError, ParserError)11import py_privatekonomi.core.db12from py_privatekonomi.core.mappers.economy_mapper import EconomyMapper13import copy14class AppProxy(HookProxy):15 def __init__(self, objname, obj):16 super(AppProxy, self).__init__(objname, obj)17 self.__called = []18 def _pre( self, name, *args, **kwargs ):19 self.__called.append(name)20 def _post(self, name, *args, **kwargs ):21 pass22 def _pre_run(self, *args, **kwargs ):23 if super(AppProxy, self).getObj().isBuilt() and len(self.__called) > 0:24 if len(self.__called) == 1 and self.__called[0] == 'run':25 return26 raise Exception("Following methods called without building first: %s" % (repr(self.__called)))27 def _post_run(self, *args, **kwargs):28 self.__called = []29 def _post_build(self, *args, **kwargs):30 self.__called = []31 def __repr__(self):32 return super(AppProxy, self).getObj().__repr__()33class App(object):34 def __init__(self):35 self.__formatter = None36 self.__parser = None37 self.__sources = []38 self.__persist = False39 self.__config = {}40 self.__db = {}41 self.__output = None42 self.__auto_discover = False43 self.__discover_from = None44 self.app = None45 def setFormatter(self, formatter_name):46 self.__formatter = formatter_name47 def setParser(self, parser_name):48 self.__parser = parser_name49 def addSource(self, source_name):50 self.__sources.append(source_name)51 def addSources(self, source_names):52 self.__sources.extend(source_names)53 def clearSources(self):54 self.__sources = []55 def persistWith(self, database_settings):56 self.__persist = True57 self.__db = database_settings58 def setOutput(self, output):59 """ Setting the output will avoid calling execute, but60 may still trigger a call to persist.61 If output is set, the building process will not consider62 formatter, parser, or source(s) """63 self.__output = output64 def autodiscover(self, discover_from):65 """ This will attempt to guess the formatter66 and parser given a list of parsers and formatters of format:67 discover_from = [68 {69 'formatter' : 'swedbank',70 'parser' : 'swedbank'71 }, { ... }72 ]73 """74 if self.__formatter is not None:75 raise Exception("Unable to autodiscover; Formatter is already set to: %s" % (self.__formatter))76 if self.__parser is not None:77 raise Exception("Unable to autodiscover; Parser is already set to: %s" % (self.__parser))78 self.__auto_discover = True79 self.__discover_from = discover_from80 def config(self, conf):81 self.__config.update(conf)82 def isBuilt(self):83 return self.app is not None84 def __set_parser(self, core):85 self.app['parser'] = None86 if self.__parser is not None:87 self.app['parser'] = self.__load_parser(self.__parser, core)88 def __set_formatter(self, core):89 self.app['formatter'] = None90 if self.__formatter is not None:91 self.app['formatter'] = self.__load_formatter(self.__formatter, core)92 def __load_formatter(self, formatter_name, core):93 return loader.load_formatter(formatter_name, core['factories']['formatters']['account_formatter_factory'])94 def __load_parser(self, parser_name, core):95 return loader.load_parser(parser_name, core['factories']['parsers']['account_parser_factory'])96 def __autodiscover(self, core):97 self.app['formatter'] = None98 self.app['parser'] = None99 for discover in self.__discover_from:100 formatter = self.__load_formatter(discover['formatter'], core)101 if formatter is not None:102 parser = self.__load_parser(discover['parser'], core)103 if parser is not None:104 self.app['formatter'] = formatter105 self.app['parser'] = parser106 return True107 return False108 def _rebuildTables(self, raw_models=None, customizations={}):109 _customizations = {}110 if 'customizations' in self.__config:111 _customizations = self.__config['customizations']112 apply_customizations = _customizations.copy()113 apply_customizations.update(customizations)114 if raw_models is None:115 raw_models = loader.load_models(EconomyMapper.getModelNames())116 return rebuild_tables(raw_models, apply_customizations)117 def _createTables(self, raw_models=None, customizations={}):118 _customizations = {}119 if 'customizations' in self.__config:120 _customizations = self.__config['customizations']121 apply_customizations = _customizations.copy()122 apply_customizations.update(customizations)123 if raw_models is None:124 raw_models = loader.load_models(EconomyMapper.getModelNames())125 return create_tables(raw_models, customizations)126 def build(self):127 self.app = {}128 core = loader.load_core()129 self.app['core'] = core130 if len(self.__db) > 0:131 self.__config['database'] = self.__db132 if self.__output is None and self.__auto_discover is False:133 if self.__formatter is None:134 raise Exception("Formatter has not been specified, please call setFormatter, setOutput, or autodiscover")135 if self.__parser is None:136 raise Exception("Parser has not been specified, please call setParser, setOutput, or autodiscover")137 if len(self.__sources) == 0:138 raise Exception("Sources have not been specified, please call addSource, setOutput, or autodiscover")139 self.__set_parser(self.app['core'])140 self.__set_formatter(self.app['core'])141 elif self.__auto_discover is True:142 found = self.__autodiscover(self.app['core'])143 if not found:144 raise Exception("Unable to find parser/formatter from %s" % (repr(self.__discover_from)))145 if self.__persist is True:146 if self.__parser is None or self.__formatter is None:147 raise Exception("persist requires formatter/parser to be set, please call setParser and/or setFormatter")148 try:149 customizations = loader.load_customizations(self.__parser)150 self.config({151 'customizations' : customizations152 })153 except ImportError as e:154 pass155 return self156 def run(self):157 def __execute():158 errors = []159 if self.__auto_discover is True:160 found = False161 for discover in self.__discover_from:162 try:163 parser = self.__load_parser(discover['parser'], self.app['core'])164 formatter = self.__load_formatter(discover['formatter'], self.app['core'])165 if parser is None or formatter is None:166 continue167 output = self.execute(168 sources=self.__sources,169 parser=parser,170 formatter=formatter,171 configs=as_obj(self.__config))172 self.app['parser'] = discover['parser']173 self.app['formatter'] = discover['formatter']174 found = True175 break176 except FormatterError as e:177 e_ = repr(e)178 e_ += "(%s)" % (repr(self.__sources))179 errors.append(e_)180 continue181 except ParserError as e:182 e_ = repr(e)183 e_ += "(%s)" % (repr(self.__sources))184 errors.append(e_)185 continue186 if found is False:187 raise Exception("Unable to parse/format using available parsers and formatters from %s (errors=%s)" % (repr(self.__discover_from), repr(errors)))188 else:189 output = self.execute(self.__sources, self.app['parser'], self.app['formatter'], as_obj(self.__config))190 return output191 if self.app is None:192 raise Exception("Build app using app.build() before running.")193 ret = {}194 if self.__output is not None:195 ret['execute'] = self.__output196 ret['formatter'] = 'unknown'197 ret['parser'] = 'unknown'198 else:199 if 'execute' not in dir(self):200 raise MissingAppFunctionError(capture_data={201 'fun_name' : 'execute',202 'app' : self.app203 })204 ret['execute'] = __execute()205 if not is_string(self.app['formatter']):206 ret['formatter'] = self.app['formatter'].getName()207 else:208 ret['formatter'] = self.app['formatter']209 if not is_string(self.app['parser']):210 ret['parser'] = self.app['parser'].getName()211 else:212 ret['parser'] = self.app['parser']213 if self.__persist is True:214 if 'persist' not in dir(self):215 raise MissingAppFunctionError(capture_data={216 'fun_name' : 'persist',217 'app' : self.app218 })219 try:220 py_privatekonomi.core.db.DB().connect(self.__config['database'])221 except AttributeError as e:222 raise Exception("Unable to connect to database: inaccurate database settings.", e)223 ret['persist'] = self.persist(ret['execute'], as_obj(self.__config))224 return ret225 def clear_password(self, v):226 if isinstance(v, dict):227 if "password" in v:228 del v["password"]229 v['password'] = '<censored>'230 for ele in v.values():231 self.clear_password(ele)232 def __repr__(self):233 config = self.__config234 config = copy.deepcopy(config)235 self.clear_password(config)...

Full Screen

Full Screen

incident.py

Source:incident.py Github

copy

Full Screen

1from time import time, gmtime, strftime2from pdpyras import APISession3def incident_iter_selected(api_key, duration, service_ids, integrations, all_tags):4 api_session = APISession(api_key)5 durations = {"0": 30, "1": 60, "2": 90, "3": 120, "4": 150, "5": 180, "6": 210, "7": 240, "8": 270, "9": 300,6 "10": 330, "11": 360, "12": 440, "13": 720, "14": 900, "15": 1080}7 incidents = get_incidents(durations[duration], api_session, service_ids, integrations, all_tags)8 print("Found %s for Service %s for %s months with integration: %s" % (\9 str(len(incidents)), service_ids[0], str(int(duration) + 1), integrations))10 return incidents11def get_incidents(duration, api_session, service_ids, integrations, all_tags):12 incidents = []13 for i in range(30, duration + 30, 30):14 disco_param = discovery_params(i, service_ids)15 temp_incidents = iter_incidents(api_session, disco_param, integrations, all_tags)16 incidents = incidents + temp_incidents17 return incidents18def discovery_params(i, services):19 current_window = i + 5 if i == 360 else i20 time_today = int(time()) - ((i - 30) * 24 * 60 * 60)21 time_to = int(time()) - (current_window * 24 * 60 * 60)22 discover_from = strftime('%Y-%m-%dT%H:%M:%S-00', gmtime(time_today))23 discover_to = strftime('%Y-%m-%dT%H:%M:%S-00', gmtime(time_to))24 window_param = {'since': discover_to, 'until': discover_from, 'service_ids[]': [services], 'time_zone': 'UTC',25 'include[]': ['first_trigger_log_entries']}26 return window_param27def iter_incidents(api_session, window_param, integrations, all_tags):28 all_incidents = []29 ignored = []30 count = 031 # Making PagerDuty API calls for Incidents in this section32 for current_incident in api_session.iter_all('incidents', params=window_param, paginate=True):33 temp_incident = current_incident34 alerts = api_session.rget('incidents/%s/alerts' % current_incident['id'])35 try:36 temp_incident["all_alerts"] = alerts["alerts"]37 except TypeError:38 temp_incident["all_alerts"] = alerts39 if "[REDACTED] by" in current_incident['description']:40 continue41 ftle_channel = current_incident["first_trigger_log_entry"]["channel"]42 # print('--------\n',integrations)43 # print(ftle_channel['details'], '\n')44 if integrations.lower() == 'datadog' and 'tags' in ftle_channel['details']:45 tags = current_incident["first_trigger_log_entry"]["channel"]["details"]["tags"]46 elif integrations.lower() == 'dynatrace' and 'Tags' in ftle_channel['details']:47 tags = current_incident["first_trigger_log_entry"]["channel"]["details"]["Tags"]48 elif integrations.lower() == 'nagios' and 'host' in ftle_channel:49 tags = 'hostname:' + current_incident["first_trigger_log_entry"]["channel"]["host"]50 elif integrations.lower() == 'checkmk' and 'host' in ftle_channel:51 tags = current_incident["first_trigger_log_entry"]["channel"]["host"]52 else:53 tags = ""54 if tags:55 extract_tags(tags, all_tags)56 temp_incident["tags"] = tags57 temp_incident["integration"] = integrations58 all_incidents.append(temp_incident)59 return all_incidents60def extract_tags(current_tags, all_tags):61 first_layer_tags = current_tags.split(",")62 if "untagged" not in all_tags:63 all_tags["untagged"] = []64 for tag in first_layer_tags:65 tag_extract = tag.strip().replace(" ", "_").split(":")66 if len(tag_extract) == 1 and tag_extract[0] not in all_tags["untagged"]:67 all_tags["untagged"].append(tag_extract[0])68 elif len(tag_extract) == 2:69 if tag_extract[0] not in all_tags:70 all_tags[tag_extract[0]] = [tag_extract[1]]71 elif tag_extract[1] not in all_tags[tag_extract[0]]:72 all_tags[tag_extract[0]].append(tag_extract[1])73 elif len(tag_extract) == 3:74 if tag_extract[0] not in all_tags:75 all_tags[tag_extract[0]] = [tag_extract[1] + tag_extract[2]]76 elif tag_extract[1] + tag_extract[2] not in all_tags[tag_extract[0]]:77 all_tags[tag_extract[0]].append(tag_extract[1] + tag_extract[2])78# api_key = "ozhUyFftDxYFTR2rsVWQ"79# service_ids = ['PG8L64X', 'PYZQ56E']80# integrations = ['Datadog', 'Dynatrace']81# durations = ['5', '9', '10']82# print("---GETTING INCIDENTS---")83# incidents = []84# tags = {}85# for i in range(len(service_ids)):86# print("---Getting incidents for %s in %s month(s)---" % (service_ids[i], str(int(durations[i]) + 1)))87# incidents += incident_iter_selected(api_key, durations[i], service_ids[i], integrations[i], tags)88#89# print("---GOT INCIDENTS---")90#91# count = 192# for incident in incidents:93# print(count, incident)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Testify automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful